mirror of https://github.com/apache/druid.git
numerous bug fixes and improvements according to code review
This commit is contained in:
parent
3417c50e98
commit
344a2b5d24
|
@ -39,8 +39,6 @@ public class SpatialDimFilter implements DimFilter
|
|||
)
|
||||
{
|
||||
Preconditions.checkArgument(dimension != null, "dimension must not be null");
|
||||
//FIXME
|
||||
Preconditions.checkArgument(dimension.endsWith(".geo"), "must filter over geo dimension!");
|
||||
Preconditions.checkArgument(bound != null, "bound must not be null");
|
||||
|
||||
this.dimension = dimension;
|
||||
|
|
|
@ -149,17 +149,16 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
|
|||
GenericIndexed<ImmutableConciseSet> bitmaps = GenericIndexed.read(
|
||||
buffer, ConciseCompressedIndexedInts.objectStrategy
|
||||
);
|
||||
builder.setBitmapIndex(new BitmapIndexColumnPartSupplier(bitmaps, dictionary));
|
||||
|
||||
ImmutableRTree spatialIndex = null;
|
||||
if (buffer.hasRemaining()) {
|
||||
spatialIndex = ByteBufferSerializer.read(
|
||||
buffer, IndexedRTree.objectStrategy
|
||||
);
|
||||
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex));
|
||||
}
|
||||
|
||||
builder.setBitmapIndex(new BitmapIndexColumnPartSupplier(bitmaps, dictionary));
|
||||
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex));
|
||||
|
||||
return new DictionaryEncodedColumnPartSerde(
|
||||
dictionary,
|
||||
singleValuedColumn,
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.google.common.collect.BiMap;
|
|||
import com.google.common.collect.HashBiMap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -38,11 +37,6 @@ import com.metamx.druid.QueryGranularity;
|
|||
import com.metamx.druid.aggregation.Aggregator;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.aggregation.post.PostAggregator;
|
||||
import com.metamx.druid.index.column.Column;
|
||||
import com.metamx.druid.index.column.ComplexColumn;
|
||||
import com.metamx.druid.index.column.DictionaryEncodedColumn;
|
||||
import com.metamx.druid.index.column.GenericColumn;
|
||||
import com.metamx.druid.index.column.ValueType;
|
||||
import com.metamx.druid.index.v1.serde.ComplexMetricExtractor;
|
||||
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
|
||||
import com.metamx.druid.index.v1.serde.ComplexMetrics;
|
||||
|
@ -58,7 +52,6 @@ import org.joda.time.Interval;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -84,6 +77,7 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
private final ImmutableList<String> metricNames;
|
||||
private final LinkedHashMap<String, Integer> dimensionOrder;
|
||||
private final CopyOnWriteArrayList<String> dimensions;
|
||||
private final List<SpatialDimensionSchema> spatialDimensions;
|
||||
private final SpatialDimensionRowFormatter spatialDimensionRowFormatter;
|
||||
private final DimensionHolder dimValues;
|
||||
private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts;
|
||||
|
@ -119,7 +113,8 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
dimensionOrder.put(dim, index++);
|
||||
dimensions.add(dim);
|
||||
}
|
||||
this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(incrementalIndexSchema.getSpatialDimensions());
|
||||
this.spatialDimensions = incrementalIndexSchema.getSpatialDimensions();
|
||||
this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(spatialDimensions);
|
||||
|
||||
this.dimValues = new DimensionHolder();
|
||||
this.facts = new ConcurrentSkipListMap<TimeAndDims, Aggregator[]>();
|
||||
|
@ -356,6 +351,11 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
return dimensions;
|
||||
}
|
||||
|
||||
public List<SpatialDimensionSchema> getSpatialDimensions()
|
||||
{
|
||||
return spatialDimensions;
|
||||
}
|
||||
|
||||
public String getMetricType(String metric)
|
||||
{
|
||||
return metricTypes.get(metric);
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
|
|||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.aggregation.Aggregator;
|
||||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.kv.EmptyIndexedInts;
|
||||
import com.metamx.druid.kv.Indexed;
|
||||
import com.metamx.druid.kv.IndexedInts;
|
||||
|
@ -63,9 +62,6 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
|||
}
|
||||
|
||||
int rowNum = 0;
|
||||
for (Row row : index) {
|
||||
|
||||
}
|
||||
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
|
||||
final String[][] dims = timeAndDims.getDims();
|
||||
|
||||
|
@ -211,11 +207,16 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
|||
metrics[i] = aggs[i].get();
|
||||
}
|
||||
|
||||
Map<String, String> description = Maps.newHashMap();
|
||||
for (SpatialDimensionSchema spatialDimensionSchema : index.getSpatialDimensions()) {
|
||||
description.put(spatialDimensionSchema.getDimName(), "spatial");
|
||||
}
|
||||
return new Rowboat(
|
||||
timeAndDims.getTimestamp(),
|
||||
dims,
|
||||
metrics,
|
||||
count++
|
||||
count++,
|
||||
description
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -334,7 +334,6 @@ public class IndexIO
|
|||
Map<String, GenericIndexed<String>> dimValueLookups = Maps.newHashMap();
|
||||
Map<String, VSizeIndexed> dimColumns = Maps.newHashMap();
|
||||
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexed = Maps.newHashMap();
|
||||
Map<String, ImmutableRTree> spatialIndexed = Maps.newHashMap();
|
||||
|
||||
for (String dimension : IndexedIterable.create(availableDimensions)) {
|
||||
ByteBuffer dimBuffer = smooshedFiles.mapFile(makeDimFile(inDir, dimension).getName());
|
||||
|
@ -358,6 +357,7 @@ public class IndexIO
|
|||
);
|
||||
}
|
||||
|
||||
Map<String, ImmutableRTree> spatialIndexed = Maps.newHashMap();
|
||||
ByteBuffer spatialBuffer = smooshedFiles.mapFile("spatial.drd");
|
||||
while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
|
||||
spatialIndexed.put(
|
||||
|
|
|
@ -36,7 +36,6 @@ import com.google.common.io.OutputSupplier;
|
|||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.collections.spatial.ImmutableRTree;
|
||||
import com.metamx.collections.spatial.RTree;
|
||||
import com.metamx.collections.spatial.search.RadiusBound;
|
||||
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
|
@ -55,8 +54,8 @@ import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
|
|||
import com.metamx.druid.index.v1.serde.ComplexMetrics;
|
||||
import com.metamx.druid.kv.ByteBufferWriter;
|
||||
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
|
||||
import com.metamx.druid.kv.GenericIndexedWriter;
|
||||
import com.metamx.druid.kv.GenericIndexed;
|
||||
import com.metamx.druid.kv.GenericIndexedWriter;
|
||||
import com.metamx.druid.kv.IOPeon;
|
||||
import com.metamx.druid.kv.Indexed;
|
||||
import com.metamx.druid.kv.IndexedInts;
|
||||
|
@ -575,7 +574,13 @@ public class IndexMerger
|
|||
j++;
|
||||
}
|
||||
|
||||
return new Rowboat(input.getTimestamp(), newDims, newMetrics, input.getRowNum());
|
||||
return new Rowboat(
|
||||
input.getTimestamp(),
|
||||
newDims,
|
||||
newMetrics,
|
||||
input.getRowNum(),
|
||||
input.getDescriptions()
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
|
@ -628,6 +633,7 @@ public class IndexMerger
|
|||
rowNumConversions.add(IntBuffer.wrap(arr));
|
||||
}
|
||||
|
||||
final Map<String, String> descriptions = Maps.newHashMap();
|
||||
for (Rowboat theRow : theRows) {
|
||||
progress.progress();
|
||||
timeWriter.add(theRow.getTimestamp());
|
||||
|
@ -662,6 +668,8 @@ public class IndexMerger
|
|||
);
|
||||
time = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
descriptions.putAll(theRow.getDescriptions());
|
||||
}
|
||||
|
||||
for (IntBuffer rowNumConversion : rowNumConversions) {
|
||||
|
@ -759,7 +767,7 @@ public class IndexMerger
|
|||
for (int i = 0; i < mergedDimensions.size(); ++i) {
|
||||
String dimension = mergedDimensions.get(i);
|
||||
|
||||
if (!dimension.endsWith(".geo")) {
|
||||
if (!"spatial".equals(descriptions.get(dimension))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1073,7 +1081,13 @@ public class IndexMerger
|
|||
}
|
||||
}
|
||||
|
||||
final Rowboat retVal = new Rowboat(input.getTimestamp(), newDims, input.getMetrics(), input.getRowNum());
|
||||
final Rowboat retVal = new Rowboat(
|
||||
input.getTimestamp(),
|
||||
newDims,
|
||||
input.getMetrics(),
|
||||
input.getRowNum(),
|
||||
input.getDescriptions()
|
||||
);
|
||||
|
||||
retVal.addRow(indexNumber, input.getRowNum());
|
||||
|
||||
|
@ -1152,7 +1166,8 @@ public class IndexMerger
|
|||
lhs.getTimestamp(),
|
||||
lhs.getDims(),
|
||||
metrics,
|
||||
lhs.getRowNum()
|
||||
lhs.getRowNum(),
|
||||
lhs.getDescriptions()
|
||||
);
|
||||
|
||||
for (Rowboat rowboat : Arrays.asList(lhs, rhs)) {
|
||||
|
|
|
@ -35,159 +35,163 @@ import java.util.NoSuchElementException;
|
|||
/**
|
||||
*/
|
||||
public class MMappedIndexAdapter implements IndexableAdapter
|
||||
{
|
||||
private final MMappedIndex index;
|
||||
private final int numRows;
|
||||
|
||||
public MMappedIndexAdapter(MMappedIndex index)
|
||||
{
|
||||
private final MMappedIndex index;
|
||||
private final int numRows;
|
||||
this.index = index;
|
||||
|
||||
public MMappedIndexAdapter(MMappedIndex index)
|
||||
{
|
||||
this.index = index;
|
||||
|
||||
numRows = index.getReadOnlyTimestamps().size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interval getDataInterval()
|
||||
{
|
||||
return index.getDataInterval();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumRows()
|
||||
{
|
||||
return numRows;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Indexed<String> getAvailableDimensions()
|
||||
{
|
||||
return index.getAvailableDimensions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Indexed<String> getAvailableMetrics()
|
||||
{
|
||||
return index.getAvailableMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Indexed<String> getDimValueLookup(String dimension)
|
||||
{
|
||||
return index.getDimValueLookup(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Rowboat> getRows()
|
||||
{
|
||||
return new Iterable<Rowboat>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<Rowboat> iterator()
|
||||
{
|
||||
return new Iterator<Rowboat>()
|
||||
{
|
||||
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
|
||||
final MetricHolder[] metrics;
|
||||
final IndexedFloats[] floatMetrics;
|
||||
final Map<String, Indexed<? extends IndexedInts>> dimensions;
|
||||
|
||||
final int numMetrics = index.getAvailableMetrics().size();
|
||||
|
||||
int currRow = 0;
|
||||
boolean done = false;
|
||||
|
||||
{
|
||||
dimensions = Maps.newLinkedHashMap();
|
||||
for (String dim : index.getAvailableDimensions()) {
|
||||
dimensions.put(dim, index.getDimColumn(dim));
|
||||
}
|
||||
|
||||
final Indexed<String> availableMetrics = index.getAvailableMetrics();
|
||||
metrics = new MetricHolder[availableMetrics.size()];
|
||||
floatMetrics = new IndexedFloats[availableMetrics.size()];
|
||||
for (int i = 0; i < metrics.length; ++i) {
|
||||
metrics[i] = index.getMetricHolder(availableMetrics.get(i));
|
||||
if (metrics[i].getType() == MetricHolder.MetricType.FLOAT) {
|
||||
floatMetrics[i] = metrics[i].getFloatType();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
final boolean hasNext = currRow < numRows;
|
||||
if (!hasNext && !done) {
|
||||
Closeables.closeQuietly(timestamps);
|
||||
for (IndexedFloats floatMetric : floatMetrics) {
|
||||
Closeables.closeQuietly(floatMetric);
|
||||
}
|
||||
done = true;
|
||||
}
|
||||
return hasNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Rowboat next()
|
||||
{
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
int[][] dims = new int[dimensions.size()][];
|
||||
int dimIndex = 0;
|
||||
for (String dim : dimensions.keySet()) {
|
||||
IndexedInts dimVals = dimensions.get(dim).get(currRow);
|
||||
|
||||
int[] theVals = new int[dimVals.size()];
|
||||
for (int j = 0; j < theVals.length; ++j) {
|
||||
theVals[j] = dimVals.get(j);
|
||||
}
|
||||
|
||||
dims[dimIndex++] = theVals;
|
||||
}
|
||||
|
||||
Object[] metricArray = new Object[numMetrics];
|
||||
for (int i = 0; i < metricArray.length; ++i) {
|
||||
switch (metrics[i].getType()) {
|
||||
case FLOAT:
|
||||
metricArray[i] = floatMetrics[i].get(currRow);
|
||||
break;
|
||||
case COMPLEX:
|
||||
metricArray[i] = metrics[i].getComplexType().get(currRow);
|
||||
}
|
||||
}
|
||||
|
||||
final Rowboat retVal = new Rowboat(timestamps.get(currRow), dims, metricArray, currRow);
|
||||
|
||||
++currRow;
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexedInts getInverteds(String dimension, String value)
|
||||
{
|
||||
return new ConciseCompressedIndexedInts(index.getInvertedIndex(dimension, value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricType(String metric)
|
||||
{
|
||||
MetricHolder holder = index.getMetricHolder(metric);
|
||||
if (holder == null) {
|
||||
return null;
|
||||
}
|
||||
return holder.getTypeName();
|
||||
}
|
||||
numRows = index.getReadOnlyTimestamps().size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interval getDataInterval()
|
||||
{
|
||||
return index.getDataInterval();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumRows()
|
||||
{
|
||||
return numRows;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Indexed<String> getAvailableDimensions()
|
||||
{
|
||||
return index.getAvailableDimensions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Indexed<String> getAvailableMetrics()
|
||||
{
|
||||
return index.getAvailableMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Indexed<String> getDimValueLookup(String dimension)
|
||||
{
|
||||
return index.getDimValueLookup(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Rowboat> getRows()
|
||||
{
|
||||
return new Iterable<Rowboat>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<Rowboat> iterator()
|
||||
{
|
||||
return new Iterator<Rowboat>()
|
||||
{
|
||||
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
|
||||
final MetricHolder[] metrics;
|
||||
final IndexedFloats[] floatMetrics;
|
||||
final Map<String, Indexed<? extends IndexedInts>> dimensions;
|
||||
|
||||
final int numMetrics = index.getAvailableMetrics().size();
|
||||
|
||||
int currRow = 0;
|
||||
boolean done = false;
|
||||
|
||||
{
|
||||
dimensions = Maps.newLinkedHashMap();
|
||||
for (String dim : index.getAvailableDimensions()) {
|
||||
dimensions.put(dim, index.getDimColumn(dim));
|
||||
}
|
||||
|
||||
final Indexed<String> availableMetrics = index.getAvailableMetrics();
|
||||
metrics = new MetricHolder[availableMetrics.size()];
|
||||
floatMetrics = new IndexedFloats[availableMetrics.size()];
|
||||
for (int i = 0; i < metrics.length; ++i) {
|
||||
metrics[i] = index.getMetricHolder(availableMetrics.get(i));
|
||||
if (metrics[i].getType() == MetricHolder.MetricType.FLOAT) {
|
||||
floatMetrics[i] = metrics[i].getFloatType();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
final boolean hasNext = currRow < numRows;
|
||||
if (!hasNext && !done) {
|
||||
Closeables.closeQuietly(timestamps);
|
||||
for (IndexedFloats floatMetric : floatMetrics) {
|
||||
Closeables.closeQuietly(floatMetric);
|
||||
}
|
||||
done = true;
|
||||
}
|
||||
return hasNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Rowboat next()
|
||||
{
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
int[][] dims = new int[dimensions.size()][];
|
||||
int dimIndex = 0;
|
||||
for (String dim : dimensions.keySet()) {
|
||||
IndexedInts dimVals = dimensions.get(dim).get(currRow);
|
||||
|
||||
int[] theVals = new int[dimVals.size()];
|
||||
for (int j = 0; j < theVals.length; ++j) {
|
||||
theVals[j] = dimVals.get(j);
|
||||
}
|
||||
|
||||
dims[dimIndex++] = theVals;
|
||||
}
|
||||
|
||||
Object[] metricArray = new Object[numMetrics];
|
||||
for (int i = 0; i < metricArray.length; ++i) {
|
||||
switch (metrics[i].getType()) {
|
||||
case FLOAT:
|
||||
metricArray[i] = floatMetrics[i].get(currRow);
|
||||
break;
|
||||
case COMPLEX:
|
||||
metricArray[i] = metrics[i].getComplexType().get(currRow);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> descriptions = Maps.newHashMap();
|
||||
for (String spatialDim : index.getSpatialIndexes().keySet()) {
|
||||
descriptions.put(spatialDim, "spatial");
|
||||
}
|
||||
final Rowboat retVal = new Rowboat(timestamps.get(currRow), dims, metricArray, currRow, descriptions);
|
||||
|
||||
++currRow;
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexedInts getInverteds(String dimension, String value)
|
||||
{
|
||||
return new ConciseCompressedIndexedInts(index.getInvertedIndex(dimension, value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricType(String metric)
|
||||
{
|
||||
MetricHolder holder = index.getMetricHolder(metric);
|
||||
if (holder == null) {
|
||||
return null;
|
||||
}
|
||||
return holder.getTypeName();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ import java.util.NoSuchElementException;
|
|||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
||||
{
|
||||
private static final Logger log = new Logger(QueryableIndexIndexableAdapter.class);
|
||||
|
@ -74,11 +74,9 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
|||
|
||||
if (col == null) {
|
||||
log.warn("Wtf!? column[%s] didn't exist!?!?!?", dim);
|
||||
}
|
||||
else if (col.getDictionaryEncoding() != null) {
|
||||
} else if (col.getDictionaryEncoding() != null) {
|
||||
availableDimensions.add(dim);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
log.info("No dictionary on dimension[%s]", dim);
|
||||
}
|
||||
}
|
||||
|
@ -236,8 +234,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
|||
final IndexedInts dimVals;
|
||||
if (dict.hasMultipleValues()) {
|
||||
dimVals = dict.getMultiValueRow(currRow);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
dimVals = new ArrayBasedIndexedInts(new int[]{dict.getSingleValueRow(currRow)});
|
||||
}
|
||||
|
||||
|
@ -253,14 +250,19 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
|||
for (int i = 0; i < metricArray.length; ++i) {
|
||||
if (metrics[i] instanceof GenericColumn) {
|
||||
metricArray[i] = ((GenericColumn) metrics[i]).getFloatSingleValueRow(currRow);
|
||||
}
|
||||
else if (metrics[i] instanceof ComplexColumn) {
|
||||
} else if (metrics[i] instanceof ComplexColumn) {
|
||||
metricArray[i] = ((ComplexColumn) metrics[i]).getRowValue(currRow);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> descriptions = Maps.newHashMap();
|
||||
for (String columnName : input.getColumnNames()) {
|
||||
if (input.getColumn(columnName).getSpatialIndex() != null) {
|
||||
descriptions.put(columnName, "spatial");
|
||||
}
|
||||
}
|
||||
final Rowboat retVal = new Rowboat(
|
||||
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow
|
||||
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow, descriptions
|
||||
);
|
||||
|
||||
++currRow;
|
||||
|
|
|
@ -37,17 +37,21 @@ public class Rowboat implements Comparable<Rowboat>
|
|||
private final int rowNum;
|
||||
private final Map<Integer, TreeSet<Integer>> comprisedRows;
|
||||
|
||||
private Map<String, String> columnDescriptor;
|
||||
|
||||
public Rowboat(
|
||||
long timestamp,
|
||||
int[][] dims,
|
||||
Object[] metrics,
|
||||
int rowNum
|
||||
int rowNum,
|
||||
Map<String, String> columnDescriptor
|
||||
)
|
||||
{
|
||||
this.timestamp = timestamp;
|
||||
this.dims = dims;
|
||||
this.metrics = metrics;
|
||||
this.rowNum = rowNum;
|
||||
this.columnDescriptor = columnDescriptor;
|
||||
|
||||
this.comprisedRows = Maps.newHashMap();
|
||||
}
|
||||
|
@ -87,6 +91,11 @@ public class Rowboat implements Comparable<Rowboat>
|
|||
return rowNum;
|
||||
}
|
||||
|
||||
public Map<String, String> getDescriptions()
|
||||
{
|
||||
return columnDescriptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Rowboat rhs)
|
||||
{
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.index.v1;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
|
@ -45,22 +64,35 @@ public class SpatialDimensionRowFormatter
|
|||
|
||||
public InputRow formatRow(final InputRow row)
|
||||
{
|
||||
final Map<String, List<String>> finalDims = Maps.newHashMap();
|
||||
final Map<String, List<String>> finalDimLookup = Maps.newHashMap();
|
||||
|
||||
// remove all spatial dimensions
|
||||
Set<String> filtered = Sets.filter(
|
||||
Sets.newHashSet(row.getDimensions()),
|
||||
new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return !spatialDimNames.contains(input);
|
||||
}
|
||||
}
|
||||
final List<String> finalDims = Lists.newArrayList(
|
||||
Iterables.filter(
|
||||
Lists.transform(
|
||||
row.getDimensions(),
|
||||
new Function<String, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(String input)
|
||||
{
|
||||
return input.toLowerCase();
|
||||
}
|
||||
}
|
||||
)
|
||||
,
|
||||
new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return !spatialDimNames.contains(input);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
for (String dim : filtered) {
|
||||
finalDims.put(dim, row.getDimension(dim));
|
||||
for (String dim : finalDims) {
|
||||
finalDimLookup.put(dim, row.getDimension(dim));
|
||||
}
|
||||
|
||||
InputRow retVal = new InputRow()
|
||||
|
@ -68,7 +100,7 @@ public class SpatialDimensionRowFormatter
|
|||
@Override
|
||||
public List<String> getDimensions()
|
||||
{
|
||||
return Lists.newArrayList(finalDims.keySet());
|
||||
return finalDims;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,7 +112,7 @@ public class SpatialDimensionRowFormatter
|
|||
@Override
|
||||
public List<String> getDimension(String dimension)
|
||||
{
|
||||
return finalDims.get(dimension);
|
||||
return finalDimLookup.get(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,7 +131,8 @@ public class SpatialDimensionRowFormatter
|
|||
}
|
||||
spatialDimVals.addAll(dimVals);
|
||||
}
|
||||
finalDims.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
|
||||
finalDimLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
|
||||
finalDims.add(spatialDimension.getDimName());
|
||||
}
|
||||
|
||||
return retVal;
|
||||
|
|
|
@ -20,6 +20,9 @@ package com.metamx.druid.index.brita;
|
|||
|
||||
import com.metamx.collections.spatial.search.Bound;
|
||||
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
|
||||
import it.uniroma3.mat.extendedset.intset.IntSet;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -40,7 +43,48 @@ public class SpatialFilter implements Filter
|
|||
@Override
|
||||
public ImmutableConciseSet goConcise(final BitmapIndexSelector selector)
|
||||
{
|
||||
return ImmutableConciseSet.union(selector.getSpatialIndex(dimension).search(bound));
|
||||
final Iterator<ImmutableConciseSet> dimValueIndexesIter = selector.getSpatialIndex(dimension).search(bound)
|
||||
.iterator();
|
||||
ImmutableConciseSet retVal = ImmutableConciseSet.union(
|
||||
new Iterable<ImmutableConciseSet>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<ImmutableConciseSet> iterator()
|
||||
{
|
||||
return new Iterator<ImmutableConciseSet>()
|
||||
{
|
||||
private IntSet.IntIterator iter;
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return dimValueIndexesIter.hasNext() || iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableConciseSet next()
|
||||
{
|
||||
if (iter != null && !iter.hasNext()) {
|
||||
iter = null;
|
||||
}
|
||||
if (iter == null) {
|
||||
ImmutableConciseSet immutableConciseSet = dimValueIndexesIter.next();
|
||||
iter = immutableConciseSet.iterator();
|
||||
}
|
||||
return selector.getConciseInvertedIndex(dimension, iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.index.brita;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
|
@ -42,7 +61,7 @@ import java.util.List;
|
|||
import java.util.Random;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class SpatialFilterTest
|
||||
{
|
||||
|
@ -241,7 +260,8 @@ public class SpatialFilterTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"timestamp", new DateTime("2013-01-01").toString(),
|
||||
"dim", "foo",
|
||||
"dim.geo", Arrays.asList(0.0f, 0.0f),
|
||||
"lat", 0.0f,
|
||||
"long", 0.0f,
|
||||
"val", 17l
|
||||
)
|
||||
)
|
||||
|
@ -253,7 +273,8 @@ public class SpatialFilterTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"timestamp", new DateTime("2013-01-02").toString(),
|
||||
"dim", "foo",
|
||||
"dim.geo", Arrays.asList(1.0f, 3.0f),
|
||||
"lat", 1.0f,
|
||||
"long", 3.0f,
|
||||
"val", 29l
|
||||
)
|
||||
)
|
||||
|
@ -265,7 +286,8 @@ public class SpatialFilterTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"timestamp", new DateTime("2013-01-03").toString(),
|
||||
"dim", "foo",
|
||||
"dim.geo", Arrays.asList(4.0f, 2.0f),
|
||||
"lat", 4.0f,
|
||||
"long", 2.0f,
|
||||
"val", 13l
|
||||
)
|
||||
)
|
||||
|
@ -277,7 +299,8 @@ public class SpatialFilterTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"timestamp", new DateTime("2013-01-04").toString(),
|
||||
"dim", "foo",
|
||||
"dim.geo", Arrays.asList(7.0f, 3.0f),
|
||||
"lat", 7.0f,
|
||||
"long", 3.0f,
|
||||
"val", 91l
|
||||
)
|
||||
)
|
||||
|
@ -289,7 +312,8 @@ public class SpatialFilterTest
|
|||
ImmutableMap.<String, Object>of(
|
||||
"timestamp", new DateTime("2013-01-05").toString(),
|
||||
"dim", "foo",
|
||||
"dim.geo", Arrays.asList(8.0f, 6.0f),
|
||||
"lat", 8.0f,
|
||||
"long", 6.0f,
|
||||
"val", 47l
|
||||
)
|
||||
)
|
||||
|
@ -303,14 +327,11 @@ public class SpatialFilterTest
|
|||
new DateTime("2013-01-01").getMillis(),
|
||||
DIMS,
|
||||
ImmutableMap.<String, Object>of(
|
||||
"timestamp",
|
||||
new DateTime("2013-01-01").toString(),
|
||||
"dim",
|
||||
"boo",
|
||||
"dim.geo",
|
||||
Arrays.asList((float) (rand.nextFloat() * 10 + 10.0), (float) (rand.nextFloat() * 10 + 10.0)),
|
||||
"val",
|
||||
i
|
||||
"timestamp", new DateTime("2013-01-01").toString(),
|
||||
"dim", "boo",
|
||||
"lat", (float) (rand.nextFloat() * 10 + 10.0),
|
||||
"long", (float) (rand.nextFloat() * 10 + 10.0),
|
||||
"val", i
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -487,4 +508,4 @@ public class SpatialFilterTest
|
|||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue