Merge remote-tracking branch 'origin/master' into druid-0.7.x

Conflicts:
	processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java
This commit is contained in:
Xavier Léauté 2014-11-12 14:08:51 -08:00
commit 4ac1aaf90e
5 changed files with 161 additions and 449 deletions

View File

@ -326,7 +326,7 @@ public class PeriodGranularity extends BaseQueryGranularity
@Override
public byte[] cacheKey()
{
return (period.toString() + ":" + chronology.getZone().toString()).getBytes(Charsets.UTF_8);
return (period.toString() + ":" + chronology.getZone().toString() + ":" + origin).getBytes(Charsets.UTF_8);
}
@Override

View File

@ -154,20 +154,22 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
actualInterval = actualInterval.withEnd(dataInterval.getEnd());
}
final Sequence<Cursor> sequence;
final Offset offset;
if (filter == null) {
sequence = new NoFilterCursorSequenceBuilder(index, actualInterval, gran).build();
offset = new NoFilterOffset(0, index.getNumRows());
} else {
final ColumnSelectorBitmapIndexSelector selector = new ColumnSelectorBitmapIndexSelector(
index.getBitmapFactoryForDimensions(),
index
);
final Offset offset = new BitmapOffset(selector.getBitmapFactory(), filter.getBitmapIndex(selector));
sequence = new CursorSequenceBuilder(index, actualInterval, gran, offset).build();
offset = new BitmapOffset(selector.getBitmapFactory(), filter.getBitmapIndex(selector));
}
return Sequences.filter(sequence, Predicates.<Cursor>notNull());
return Sequences.filter(
new CursorSequenceBuilder(index, actualInterval, gran, offset).build(),
Predicates.<Cursor>notNull()
);
}
private static class CursorSequenceBuilder
@ -619,6 +621,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
private final Offset baseOffset;
private final GenericColumn timestamps;
private final long threshold;
private final boolean allWithinThreshold;
public TimestampCheckingOffset(
Offset baseOffset,
@ -629,6 +632,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
this.baseOffset = baseOffset;
this.timestamps = timestamps;
this.threshold = threshold;
// checks if all the values are within the Threshold specified, skips timestamp lookups and checks if all values are within threshold.
this.allWithinThreshold = timestamps.getLongSingleValueRow(timestamps.length() - 1) < threshold;
}
@Override
@ -646,7 +651,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public boolean withinBounds()
{
return baseOffset.withinBounds() && timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold;
return baseOffset.withinBounds() && (allWithinThreshold
|| timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold);
}
@Override
@ -656,441 +662,39 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
private static class NoFilterCursorSequenceBuilder
private static class NoFilterOffset implements Offset
{
private final ColumnSelector index;
private final Interval interval;
private final QueryGranularity gran;
private final int rowCount;
private volatile int currentOffset;
public NoFilterCursorSequenceBuilder(
ColumnSelector index,
Interval interval,
QueryGranularity gran
)
NoFilterOffset(int currentOffset, int rowCount)
{
this.index = index;
this.interval = interval;
this.gran = gran;
this.currentOffset = currentOffset;
this.rowCount = rowCount;
}
/**
* This produces iterators of Cursor objects that must be fully processed (until isDone() returns true) before the
* next Cursor is processed. It is *not* safe to pass these cursors off to another thread for parallel processing
*
* @return
*/
public Sequence<Cursor> build()
@Override
public void increment()
{
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
currentOffset++;
}
final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
@Override
public boolean withinBounds()
{
return currentOffset < rowCount;
}
return Sequences.withBaggage(
Sequences.map(
Sequences.simple(gran.iterable(interval.getStartMillis(), interval.getEndMillis())),
new Function<Long, Cursor>()
{
private int currRow = 0;
@Override
public Offset clone()
{
return new NoFilterOffset(currentOffset, rowCount);
}
@Override
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (currRow < timestamps.length() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
++currRow;
}
return new Cursor()
{
private final DateTime myBucket = gran.toDateTime(input);
private final long nextBucket = Math.min(gran.next(myBucket.getMillis()), interval.getEndMillis());
private final int initRow = currRow;
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
++currRow;
}
@Override
public void advanceTo(int offset)
{
currRow += offset;
}
@Override
public boolean isDone()
{
return currRow >= timestamps.length() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
}
@Override
public void reset()
{
currRow = initRow;
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
final String dimensionName = dimension.toLowerCase();
DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimensionName);
final Column columnDesc = index.getColumn(dimensionName);
if (cachedColumn == null && columnDesc != null) {
cachedColumn = columnDesc.getDictionaryEncoding();
dictionaryColumnCache.put(dimensionName, cachedColumn);
}
final DictionaryEncodedColumn column = cachedColumn;
if (column == null) {
return null;
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
return column.getMultiValueRow(currRow);
}
@Override
public int getValueCardinality()
{
return column.getCardinality();
}
@Override
public String lookupName(int id)
{
final String retVal = column.lookupName(id);
return retVal == null ? "" : retVal;
}
@Override
public int lookupId(String name)
{
return column.lookupId(name);
}
};
} else {
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
// using an anonymous class is faster than creating a class that stores a copy of the value
return new IndexedInts()
{
@Override
public int size()
{
return 1;
}
@Override
public int get(int index)
{
return column.getSingleValueRow(currRow);
}
@Override
public Iterator<Integer> iterator()
{
return Iterators.singletonIterator(column.getSingleValueRow(currRow));
}
};
}
@Override
public int getValueCardinality()
{
return column.getCardinality();
}
@Override
public String lookupName(int id)
{
return column.lookupName(id);
}
@Override
public int lookupId(String name)
{
return column.lookupId(name);
}
};
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
GenericColumn cachedMetricVals = genericColumnCache.get(metricName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && (holder.getCapabilities().getType() == ValueType.LONG
|| holder.getCapabilities().getType() == ValueType.FLOAT)) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return new FloatColumnSelector()
{
@Override
public float get()
{
return 0.0f;
}
};
}
final GenericColumn metricVals = cachedMetricVals;
return new FloatColumnSelector()
{
@Override
public float get()
{
return metricVals.getFloatSingleValueRow(currRow);
}
};
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
GenericColumn cachedMetricVals = genericColumnCache.get(metricName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && (holder.getCapabilities().getType() == ValueType.LONG
|| holder.getCapabilities().getType() == ValueType.FLOAT)) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return new LongColumnSelector()
{
@Override
public long get()
{
return 0L;
}
};
}
final GenericColumn metricVals = cachedMetricVals;
return new LongColumnSelector()
{
@Override
public long get()
{
return metricVals.getLongSingleValueRow(currRow);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumnVals = objectColumnCache.get(columnName);
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if (holder != null) {
final ValueType type = holder.getCapabilities().getType();
if (holder.getCapabilities().isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
} else if (type == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
} else {
cachedColumnVals = holder.getGenericColumn();
}
}
if (cachedColumnVals != null) {
objectColumnCache.put(columnName, cachedColumnVals);
}
}
if (cachedColumnVals == null) {
return null;
}
if (cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if (columnVals.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued GenericColumns"
);
}
if (type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(currRow);
}
};
}
if (type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.TYPE;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(currRow);
}
};
}
if (type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(currRow);
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
if (columnVals.hasMultipleValues()) {
return new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
final IndexedInts multiValueRow = columnVals.getMultiValueRow(currRow);
if (multiValueRow.size() == 0) {
return null;
} else if (multiValueRow.size() == 1) {
return columnVals.lookupName(multiValueRow.get(0));
} else {
final String[] strings = new String[multiValueRow.size()];
for (int i = 0; i < multiValueRow.size(); i++) {
strings[i] = columnVals.lookupName(multiValueRow.get(i));
}
return strings;
}
}
};
} else {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
}
};
}
}
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(currRow);
}
};
}
};
}
}
),
new Closeable()
{
@Override
public void close() throws IOException
{
CloseQuietly.close(timestamps);
for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) {
CloseQuietly.close(column);
}
for (GenericColumn column : genericColumnCache.values()) {
CloseQuietly.close(column);
}
for (ComplexColumn complexColumn : complexColumnCache.values()) {
CloseQuietly.close(complexColumn);
}
for (Object column : objectColumnCache.values()) {
if (column instanceof Closeable) {
CloseQuietly.close((Closeable) column);
}
}
}
}
);
@Override
public int getOffset()
{
return currentOffset;
}
}
}

View File

@ -22,7 +22,9 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -50,19 +52,28 @@ public class DataSchema
final Set<String> dimensionExclusions = Sets.newHashSet();
for (AggregatorFactory aggregator : aggregators) {
dimensionExclusions.add(aggregator.getName());
dimensionExclusions.addAll(aggregator.requiredFields());
}
if (parser != null && parser.getParseSpec() != null) {
if (parser.getParseSpec().getTimestampSpec() != null) {
dimensionExclusions.add(parser.getParseSpec().getTimestampSpec().getTimestampColumn());
final DimensionsSpec dimensionsSpec = parser.getParseSpec().getDimensionsSpec();
final TimestampSpec timestampSpec = parser.getParseSpec().getTimestampSpec();
// exclude timestamp from dimensions by default, unless explicitly included in the list of dimensions
if (timestampSpec != null) {
final String timestampColumn = timestampSpec.getTimestampColumn();
if (!(dimensionsSpec.hasCustomDimensions() && dimensionsSpec.getDimensions().contains(timestampColumn))) {
dimensionExclusions.add(timestampColumn);
}
}
if (parser.getParseSpec().getDimensionsSpec() != null) {
if (dimensionsSpec != null) {
this.parser = parser.withParseSpec(
parser.getParseSpec()
.withDimensionsSpec(
parser.getParseSpec()
.getDimensionsSpec()
.withDimensionExclusions(dimensionExclusions)
dimensionsSpec
.withDimensionExclusions(
Sets.difference(dimensionExclusions,
Sets.newHashSet(dimensionsSpec.getDimensions()))
)
)
);
} else {

View File

@ -0,0 +1,87 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.indexing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.junit.Assert;
import org.joda.time.Interval;
import org.junit.Test;
public class DataSchemaTest
{
@Test
public void testDefaultExclusions() throws Exception
{
DataSchema schema = new DataSchema(
"test",
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto"),
new DimensionsSpec(ImmutableList.of("dimB", "dimA"), null, null)
),
null, null, null, null
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
);
Assert.assertEquals(
ImmutableSet.of("time", "col1", "col2"),
schema.getParser().getParseSpec().getDimensionsSpec().getDimensionExclusions()
);
}
@Test
public void testExplicitInclude() throws Exception
{
DataSchema schema = new DataSchema(
"test",
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto"),
new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "col2"), ImmutableList.of("dimC"), null)
),
null, null, null, null
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
);
Assert.assertEquals(
ImmutableSet.of("dimC", "col1"),
schema.getParser().getParseSpec().getDimensionsSpec().getDimensionExclusions()
);
}
}

View File

@ -27,11 +27,12 @@ import io.druid.server.coordinator.CostBalancerStrategy;
import io.druid.server.coordinator.LoadQueuePeonTester;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import junit.framework.Assert;
import org.junit.Assert;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
@ -109,19 +110,33 @@ public class CostBalancerStrategyTest
}
@Test
public void testCostBalancerMultithreadStrategy() throws InterruptedException
public void testCostBalancerMultiThreadedStrategy() throws InterruptedException
{
setupDummyCluster(10, 20);
DataSegment segment = getSegment(1000);
BalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 1);
final DateTime referenceTimestamp = new DateTime("2014-01-01");
BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, 4);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
}
@Test
public void testPerf() throws InterruptedException
public void testCostBalancerSingleThreadStrategy() throws InterruptedException
{
setupDummyCluster(10, 20);
DataSegment segment = getSegment(1000);
final DateTime referenceTimestamp = new DateTime("2014-01-01");
BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, 1);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
}
@Test @Ignore
public void testBenchmark() throws InterruptedException
{
setupDummyCluster(100, 500);
DataSegment segment = getSegment(1000);
@ -140,10 +155,5 @@ public class CostBalancerStrategyTest
System.err.println("Latency - Single Threaded (ms): " + latencySingleThread);
System.err.println("Latency - Multi Threaded (ms): " + latencyMultiThread);
Assert.assertTrue("Latency of multi-thread strategy should always be less than single thread.", latencyMultiThread < latencySingleThread);
}
}