Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2013-04-19 16:57:52 -07:00
commit 4c1cbdc5dc
38 changed files with 635 additions and 677 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -21,7 +21,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.util.Comparator;
import java.util.List;
@ -29,7 +29,7 @@ import java.util.List;
/**
* Processing related interface
*
* An AggregatorFactory is an object that knows how to generate an Aggregator using a MetricSelectorFactory.
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
*
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
* without making any assumptions about how they are pulling values out of the base data. That is, the data is
@ -48,8 +48,8 @@ import java.util.List;
})
public interface AggregatorFactory
{
public Aggregator factorize(MetricSelectorFactory metricFactory);
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory);
public Aggregator factorize(ColumnSelectorFactory metricFactory);
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory);
public Comparator getComparator();
/**

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.util.Comparator;
import java.util.List;
@ -44,13 +44,13 @@ public class CountAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new CountAggregator(name);
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new CountBufferAggregator();
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,7 +49,7 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleSumAggregator(
name,
@ -58,7 +58,7 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleSumBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Longs;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import org.apache.commons.codec.binary.Base64;
import java.nio.ByteBuffer;
@ -56,7 +56,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
for(int i = 0; i < breaksList.size(); ++i) this.breaks[i] = breaksList.get(i);
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new HistogramAggregator(
name,
@ -66,7 +66,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new HistogramBufferAggregator(
metricFactory.makeFloatMetricSelector(fieldName),

View File

@ -20,7 +20,7 @@
package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import java.util.List;
@ -28,7 +28,7 @@ public class JavaScriptAggregator implements Aggregator
{
static interface ScriptAggregator
{
public double aggregate(double current, FloatMetricSelector[] selectorList);
public double aggregate(double current, ObjectColumnSelector[] selectorList);
public double combine(double a, double b);
@ -38,15 +38,15 @@ public class JavaScriptAggregator implements Aggregator
}
private final String name;
private final FloatMetricSelector[] selectorList;
private final ObjectColumnSelector[] selectorList;
private final ScriptAggregator script;
private volatile double current;
public JavaScriptAggregator(String name, List<FloatMetricSelector> selectorList, ScriptAggregator script)
public JavaScriptAggregator(String name, List<ObjectColumnSelector> selectorList, ScriptAggregator script)
{
this.name = name;
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{});
this.script = script;
this.current = script.reset();

View File

@ -21,18 +21,16 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import com.metamx.common.IAE;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import com.metamx.druid.processing.ObjectColumnSelector;
import org.mozilla.javascript.Context;
import org.mozilla.javascript.ContextAction;
import org.mozilla.javascript.ContextFactory;
import org.mozilla.javascript.Function;
import org.mozilla.javascript.Script;
import org.mozilla.javascript.ScriptableObject;
import javax.annotation.Nullable;
@ -48,7 +46,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
private final String name;
private final List<String> fieldNames;
private final String script;
private final String fnAggregate;
private final String fnReset;
private final String fnCombine;
private final JavaScriptAggregator.ScriptAggregator compiledScript;
@ -56,26 +58,34 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
public JavaScriptAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldNames") final List<String> fieldNames,
@JsonProperty("script") final String expression
@JsonProperty("fnAggregate") final String fnAggregate,
@JsonProperty("fnReset") final String fnReset,
@JsonProperty("fnCombine") final String fnCombine
)
{
this.name = name;
this.script = expression;
this.fieldNames = fieldNames;
this.compiledScript = compileScript(script);
this.fnAggregate = fnAggregate;
this.fnReset = fnReset;
this.fnCombine = fnCombine;
this.compiledScript = compileScript(fnAggregate, fnReset, fnCombine);
}
@Override
public Aggregator factorize(final MetricSelectorFactory metricFactory)
public Aggregator factorize(final ColumnSelectorFactory columnFactory)
{
return new JavaScriptAggregator(
name,
Lists.transform(
fieldNames,
new com.google.common.base.Function<String, FloatMetricSelector>()
new com.google.common.base.Function<String, ObjectColumnSelector>()
{
@Override
public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); }
public ObjectColumnSelector apply(@Nullable String s) {
return columnFactory.makeObjectColumnSelector(s);
}
}
),
compiledScript
@ -83,17 +93,16 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(final MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory)
{
return new JavaScriptBufferAggregator(
Lists.transform(
fieldNames,
new com.google.common.base.Function<String, FloatMetricSelector>()
new com.google.common.base.Function<String, ObjectColumnSelector>()
{
@Override
public FloatMetricSelector apply(@Nullable String s)
{
return metricFactory.makeFloatMetricSelector(s);
public ObjectColumnSelector apply(@Nullable String s) {
return columnSelectorFactory.makeObjectColumnSelector(s);
}
}
),
@ -116,7 +125,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@Override
public AggregatorFactory getCombiningFactory()
{
throw new UnsupportedOperationException();
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
}
@Override
@ -144,8 +153,21 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
@JsonProperty
public String getScript() {
return script;
public String getFnAggregate()
{
return fnAggregate;
}
@JsonProperty
public String getFnReset()
{
return fnReset;
}
@JsonProperty
public String getFnCombine()
{
return fnCombine;
}
@Override
@ -159,8 +181,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
{
try {
MessageDigest md = MessageDigest.getInstance("SHA-1");
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes();
byte[] sha1 = md.digest(script.getBytes());
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8);
byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8));
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
.put(CACHE_TYPE_ID)
@ -197,21 +219,13 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
return "JavaScriptAggregatorFactory{" +
"name='" + name + '\'' +
", fieldNames=" + fieldNames +
", script='" + script + '\'' +
", fnAggregate='" + fnAggregate + '\'' +
", fnReset='" + fnReset + '\'' +
", fnCombine='" + fnCombine + '\'' +
'}';
}
protected static Function getScriptFunction(String name, ScriptableObject scope)
{
Object fun = scope.get(name, scope);
if (fun instanceof Function) {
return (Function) fun;
} else {
throw new IAE("Function [%s] not defined in script", name);
}
}
public static JavaScriptAggregator.ScriptAggregator compileScript(final String script)
public static JavaScriptAggregator.ScriptAggregator compileScript(final String aggregate, final String reset, final String combine)
{
final ContextFactory contextFactory = ContextFactory.getGlobal();
Context context = contextFactory.enterContext();
@ -219,18 +233,15 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
final ScriptableObject scope = context.initStandardObjects();
Script compiledScript = context.compileString(script, "script", 1, null);
compiledScript.exec(context, scope);
final Function fnAggregate = getScriptFunction("aggregate", scope);
final Function fnReset = getScriptFunction("reset", scope);
final Function fnCombine = getScriptFunction("combine", scope);
final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null);
final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null);
final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null);
Context.exit();
return new JavaScriptAggregator.ScriptAggregator()
{
@Override
public double aggregate(final double current, final FloatMetricSelector[] selectorList)
public double aggregate(final double current, final ObjectColumnSelector[] selectorList)
{
Context cx = Context.getCurrentContext();
if(cx == null) cx = contextFactory.enterContext();

View File

@ -20,22 +20,24 @@
package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import java.nio.ByteBuffer;
import java.util.List;
public class JavaScriptBufferAggregator implements BufferAggregator
{
private final FloatMetricSelector[] selectorList;
private final ObjectColumnSelector[] selectorList;
private final JavaScriptAggregator.ScriptAggregator script;
public JavaScriptBufferAggregator(
List<FloatMetricSelector> selectorList,
List<ObjectColumnSelector> selectorList,
JavaScriptAggregator.ScriptAggregator script
)
{
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{});
this.script = script;
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Longs;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,7 +49,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongSumAggregator(
name,
@ -58,7 +58,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongSumBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,13 +49,13 @@ public class MaxAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new MaxAggregator(name, metricFactory.makeFloatMetricSelector(fieldName));
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new MaxBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,13 +49,13 @@ public class MinAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new MinAggregator(name, metricFactory.makeFloatMetricSelector(fieldName));
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new MinBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -19,7 +19,7 @@
package com.metamx.druid.aggregation;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.util.Comparator;
import java.util.List;
@ -36,13 +36,13 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return baseAggregatorFactory.factorize(metricFactory);
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return baseAggregatorFactory.factorizeBuffered(metricFactory);
}

View File

@ -22,8 +22,9 @@ package com.metamx.druid.processing;
/**
* Factory class for MetricSelectors
*/
public interface MetricSelectorFactory
public interface ColumnSelectorFactory
{
public FloatMetricSelector makeFloatMetricSelector(String metricName);
public ComplexMetricSelector makeComplexMetricSelector(String metricName);
public ObjectColumnSelector makeObjectColumnSelector(String columnName);
}

View File

@ -1,5 +0,0 @@
package com.metamx.druid.processing;
public class MetricSelectorUtils
{
}

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.processing;
public interface ObjectColumnSelector<T>
{
public Class<T> classOfObject();
public T get();
}

View File

@ -20,25 +20,29 @@
package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.FloatMetricSelector;
import com.google.common.collect.Maps;
import com.metamx.druid.processing.ObjectColumnSelector;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
public class JavaScriptAggregatorTest
{
protected static final String sumLogATimesBPlusTen =
"function aggregate(current, a, b) { return current + (Math.log(a) * b) }"
+ "function combine(a,b) { return a + b }"
+ "function reset() { return 10 }";
protected static final Map<String, String> sumLogATimesBPlusTen = Maps.newHashMap();
protected static final Map<String, String> scriptDoubleSum = Maps.newHashMap();
protected static final String scriptDoubleSum =
"function aggregate(current, a) { return current + a }"
+ "function combine(a,b) { return a + b }"
+ "function reset() { return 0 }";
static {
sumLogATimesBPlusTen.put("fnAggregate", "function aggregate(current, a, b) { return current + (Math.log(a) * b) }");
sumLogATimesBPlusTen.put("fnReset", "function reset() { return 10 }");
sumLogATimesBPlusTen.put("fnCombine", "function combine(a,b) { return a + b }");
scriptDoubleSum.put("fnAggregate", "function aggregate(current, a) { return current + a }");
scriptDoubleSum.put("fnReset", "function reset() { return 0 }");
scriptDoubleSum.put("fnCombine", "function combine(a,b) { return a + b }");
}
private static void aggregate(TestFloatMetricSelector selector1, TestFloatMetricSelector selector2, Aggregator agg)
{
@ -69,10 +73,14 @@ public class JavaScriptAggregatorTest
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
Map<String, String> script = sumLogATimesBPlusTen;
JavaScriptAggregator agg = new JavaScriptAggregator(
"billy",
Arrays.<FloatMetricSelector>asList(selector1, selector2),
JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen)
Arrays.<ObjectColumnSelector>asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)),
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine"))
);
agg.reset();
@ -103,9 +111,12 @@ public class JavaScriptAggregatorTest
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
Map<String, String> script = sumLogATimesBPlusTen;
JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator(
Arrays.<FloatMetricSelector>asList(selector1, selector2),
JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen)
Arrays.<ObjectColumnSelector>asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)),
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine"))
);
ByteBuffer buf = ByteBuffer.allocateDirect(32);
@ -150,10 +161,13 @@ public class JavaScriptAggregatorTest
}
*/
Map<String, String> script = scriptDoubleSum;
JavaScriptAggregator aggRhino = new JavaScriptAggregator(
"billy",
Lists.asList(selector, new FloatMetricSelector[]{}),
JavaScriptAggregatorFactory.compileScript(scriptDoubleSum)
Lists.asList(MetricSelectorUtils.wrap(selector), new ObjectColumnSelector[]{}),
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine"))
);
DoubleSumAggregator doubleAgg = new DoubleSumAggregator("billy", selector);

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.aggregation;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
public class MetricSelectorUtils
{
public static ObjectColumnSelector<Float> wrap(final FloatMetricSelector selector)
{
return new ObjectColumnSelector<Float>()
{
@Override
public Class<Float> classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return selector.get();
}
};
}
}

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<modules>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -36,15 +36,21 @@ import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.Aggregator;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
import com.metamx.druid.index.column.GenericColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.serde.ComplexMetricExtractor;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.processing.ColumnSelectorFactory;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ObjectColumnSelector;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -169,7 +175,7 @@ public class IncrementalIndex implements Iterable<Row>
for (int i = 0; i < metrics.length; ++i) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
new MetricSelectorFactory()
new ColumnSelectorFactory()
{
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
@ -189,6 +195,7 @@ public class IncrementalIndex implements Iterable<Row>
public ComplexMetricSelector makeComplexMetricSelector(final String metric)
{
final String typeName = agg.getTypeName();
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
@ -213,6 +220,53 @@ public class IncrementalIndex implements Iterable<Row>
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if(typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
}
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
}
}
);
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,6 +23,8 @@ import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -81,7 +83,28 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
return exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, null, new DateTime());
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(TaskStatus result)
{
runningItems.remove(taskRunnerWorkItem);
}
@Override
public void onFailure(Throwable t)
{
runningItems.remove(taskRunnerWorkItem);
}
}
);
return statusFuture;
}
@Override
@ -97,34 +120,13 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return runningItems;
return ImmutableList.copyOf(runningItems);
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
return Lists.newArrayList(
FunctionalIterable.create(tpe.getQueue())
.keep(
new Function<Runnable, TaskRunnerWorkItem>()
{
@Override
public TaskRunnerWorkItem apply(Runnable input)
{
if (input instanceof ExecutorServiceTaskRunnerCallable) {
return ((ExecutorServiceTaskRunnerCallable) input).getTaskRunnerWorkItem();
}
return null;
}
}
)
);
}
return Lists.newArrayList();
return ImmutableList.of();
}
@Override

View File

@ -125,11 +125,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
// time to adjust process holders
synchronized (tasks) {
if (Thread.interrupted()) {
throw new InterruptedException();
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
@ -159,7 +160,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
System.getProperty(propName)
props.getProperty(propName)
)
);
}
@ -227,11 +228,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
return TaskStatus.failure(task.getId());
}
}
catch (InterruptedException e) {
log.info(e, "Interrupted during execution");
return TaskStatus.failure(task.getId());
}
catch (IOException e) {
catch (Exception e) {
log.info(e, "Exception caught during execution");
throw Throwables.propagate(e);
}
finally {
@ -288,9 +286,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
log.info("Ignoring request to cancel unknown task: %s", taskid);
return;
}
}
taskInfo.statusFuture.cancel(true);
taskInfo.shutdown = true;
}
if (taskInfo.processHolder != null) {
final int shutdowns = taskInfo.processHolder.shutdowns.getAndIncrement();
@ -397,6 +395,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private static class TaskInfo
{
private final ListenableFuture<TaskStatus> statusFuture;
private volatile boolean shutdown = false;
private volatile ProcessHolder processHolder = null;
private TaskInfo(ListenableFuture<TaskStatus> statusFuture)

View File

@ -573,9 +573,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (taskStatus.isComplete()) {
if (taskRunnerWorkItem != null) {
final SettableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
result.set(taskStatus);
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
}

View File

@ -20,6 +20,9 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskStatus;
@ -33,7 +36,7 @@ import org.joda.time.DateTimeComparator;
public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{
private final Task task;
private final SettableFuture<TaskStatus> result;
private final ListenableFuture<TaskStatus> result;
private final RetryPolicy retryPolicy;
private final DateTime createdTime;
@ -41,7 +44,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result,
ListenableFuture<TaskStatus> result,
RetryPolicy retryPolicy,
DateTime createdTime
)
@ -58,7 +61,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return task;
}
public SettableFuture<TaskStatus> getResult()
public ListenableFuture<TaskStatus> getResult()
{
return result;
}
@ -89,7 +92,10 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
@Override
public int compareTo(TaskRunnerWorkItem taskRunnerWorkItem)
{
return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime());
return ComparisonChain.start()
.compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance())
.compare(task.getId(), taskRunnerWorkItem.getTask().getId())
.result();
}
@Override

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -43,6 +43,7 @@ import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import com.metamx.druid.query.search.SearchHit;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQuerySpec;
@ -359,6 +360,61 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
final Integer metricIndexInt = index.getMetricIndex(columnName);
if(metricIndexInt != null) {
final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName));
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return serde.getObjectStrategy().getClazz();
}
@Override
public Object get()
{
return currEntry.getValue()[metricIndex].get();
}
};
}
final Integer dimensionIndexInt = index.getDimensionIndex(columnName);
if(dimensionIndexInt != null) {
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
if(dimVals.length == 1) return dimVals[0];
if(dimVals.length == 0) return null;
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
};
}
return null;
}
};
}
}

View File

@ -1,523 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.Pair;
import com.metamx.common.collect.MoreIterators;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.Capabilities;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.index.brita.BitmapIndexSelector;
import com.metamx.druid.index.brita.Filter;
import com.metamx.druid.index.v1.processing.ArrayBasedOffset;
import com.metamx.druid.index.v1.processing.Cursor;
import com.metamx.druid.index.v1.processing.DimensionSelector;
import com.metamx.druid.index.v1.processing.Offset;
import com.metamx.druid.index.v1.processing.StartLimitedOffset;
import com.metamx.druid.kv.ArrayBasedIndexedInts;
import com.metamx.druid.kv.ArrayIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedFloats;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.ListIndexed;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
/**
*/
public class IndexStorageAdapter extends BaseStorageAdapter
{
private final Logger log = new Logger(IndexStorageAdapter.class);
private final Index index;
private final int[] ids;
private final Capabilities capabilities;
public IndexStorageAdapter(
Index index
)
{
this.index = index;
capabilities = Capabilities.builder()
.dimensionValuesSorted(isReverseDimSorted())
.build();
ids = new int[index.timeOffsets.length];
for (int i = 0; i < ids.length; i++) {
ids[i] = i;
}
}
@Override
public String getSegmentIdentifier()
{
throw new UnsupportedOperationException();
}
@Override
public Interval getInterval()
{
return index.dataInterval;
}
@Override
public int getDimensionCardinality(String dimension)
{
final String[] strings = index.reverseDimLookup.get(dimension);
return strings == null ? 0 : strings.length;
}
@Override
public DateTime getMinTime()
{
return new DateTime(index.timeOffsets[0]);
}
@Override
public DateTime getMaxTime()
{
return new DateTime(index.timeOffsets[index.timeOffsets.length - 1]);
}
@Override
public Iterable<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran)
{
Interval actualIntervalTmp = interval;
if (!actualIntervalTmp.overlaps(index.dataInterval)) {
return ImmutableList.of();
}
if (actualIntervalTmp.getStart().isBefore(index.dataInterval.getStart())) {
actualIntervalTmp = actualIntervalTmp.withStart(index.dataInterval.getStart());
}
if (actualIntervalTmp.getEnd().isAfter(index.dataInterval.getEnd())) {
actualIntervalTmp = actualIntervalTmp.withEnd(index.dataInterval.getEnd());
}
final Interval actualInterval = actualIntervalTmp;
final Pair<Integer, Integer> intervalStartAndEnd = computeTimeStartEnd(actualInterval);
return new Iterable<Cursor>()
{
@Override
public Iterator<Cursor> iterator()
{
final Offset baseOffset;
if (filter == null) {
baseOffset = new ArrayBasedOffset(ids, intervalStartAndEnd.lhs);
} else {
baseOffset = new StartLimitedOffset(
new ConciseOffset(filter.goConcise(new IndexBasedBitmapIndexSelector(index))),
intervalStartAndEnd.lhs
);
}
final Map<String, Object> metricHolderCache = Maps.newHashMap();
// This after call is not perfect, if there is an exception during processing, it will never get called,
// but it's better than nothing and doing this properly all the time requires a lot more fixerating
return MoreIterators.after(
FunctionalIterator
.create(gran.iterable(actualInterval.getStartMillis(), actualInterval.getEndMillis()).iterator())
.keep(
new Function<Long, Cursor>()
{
@Override
public Cursor apply(final Long intervalStart)
{
final Offset offset = new TimestampCheckingOffset(
baseOffset,
index.timeOffsets,
Math.min(actualInterval.getEndMillis(), gran.next(intervalStart))
);
return new Cursor()
{
private final Offset initOffset = offset.clone();
private Offset cursorOffset = offset;
private final DateTime timestamp = gran.toDateTime(intervalStart);
@Override
public DateTime getTime()
{
return timestamp;
}
@Override
public void advance()
{
cursorOffset.increment();
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public void reset()
{
cursorOffset = initOffset.clone();
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
final String dimensionName = dimension.toLowerCase();
final String[] nameLookup = index.reverseDimLookup.get(dimensionName);
if (nameLookup == null) {
return null;
}
return new DimensionSelector()
{
final Map<String, Integer> dimValLookup = index.dimIdLookup.get(dimensionName);
final DimensionColumn dimColumn = index.dimensionValues.get(dimensionName);
final int[][] dimensionExpansions = dimColumn.getDimensionExpansions();
final int[] dimensionRowValues = dimColumn.getDimensionRowValues();
@Override
public IndexedInts getRow()
{
return new ArrayBasedIndexedInts(dimensionExpansions[dimensionRowValues[cursorOffset.getOffset()]]);
}
@Override
public int getValueCardinality()
{
return nameLookup.length;
}
@Override
public String lookupName(int id)
{
return nameLookup[id];
}
@Override
public int lookupId(String name)
{
final Integer retVal = dimValLookup.get(name);
return retVal == null ? -1 : retVal;
}
};
}
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
{
String metricName = metric.toLowerCase();
IndexedFloats cachedFloats = (IndexedFloats) metricHolderCache.get(metric);
if (cachedFloats == null) {
MetricHolder holder = index.metricVals.get(metricName);
if (holder == null) {
return new FloatMetricSelector()
{
@Override
public float get()
{
return 0.0f;
}
};
}
cachedFloats = holder.getFloatType();
metricHolderCache.put(metricName, cachedFloats);
}
final IndexedFloats metricVals = cachedFloats;
return new FloatMetricSelector()
{
@Override
public float get()
{
return metricVals.get(cursorOffset.getOffset());
}
};
}
@Override
public ComplexMetricSelector makeComplexMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
Indexed cachedComplex = (Indexed) metricHolderCache.get(metricName);
if (cachedComplex == null) {
MetricHolder holder = index.metricVals.get(metricName);
if (holder != null) {
cachedComplex = holder.getComplexType();
metricHolderCache.put(metricName, cachedComplex);
}
}
if (cachedComplex == null) {
return null;
}
final Indexed vals = cachedComplex;
return new ComplexMetricSelector()
{
@Override
public Class classOfObject()
{
return vals.getClazz();
}
@Override
public Object get()
{
return vals.get(cursorOffset.getOffset());
}
};
}
};
}
}
),
new Runnable()
{
@Override
public void run()
{
for (Object object : metricHolderCache.values()) {
if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object);
}
}
}
}
);
}
};
}
@Override
public Indexed<String> getAvailableDimensions()
{
return new ArrayIndexed<String>(index.dimensions, String.class);
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
return new ListIndexed<String>(
Lists.newArrayList(index.dimIdLookup.get(dimension.toLowerCase()).keySet()), String.class
);
}
@Override
public ImmutableConciseSet getInvertedIndex(String dimension, String dimVal)
{
return index.getInvertedIndex(dimension.toLowerCase(), dimVal);
}
@Override
public Offset getFilterOffset(Filter filter)
{
return new ConciseOffset(filter.goConcise(new IndexBasedBitmapIndexSelector(index)));
}
@Override
public Capabilities getCapabilities()
{
return capabilities;
}
private boolean isReverseDimSorted()
{
for (Map.Entry<String, String[]> entry : index.reverseDimLookup.entrySet()) {
String[] arr = entry.getValue();
for (int i = 0; i < arr.length - 1; i++) {
if (arr[i].compareTo(arr[i + 1]) > 0) {
return false;
}
}
}
return true;
}
private Pair<Integer, Integer> computeTimeStartEnd(Interval interval)
{
DateTime actualIntervalStart = index.dataInterval.getStart();
DateTime actualIntervalEnd = index.dataInterval.getEnd();
if (index.dataInterval.contains(interval.getStart())) {
actualIntervalStart = interval.getStart();
}
if (index.dataInterval.contains(interval.getEnd())) {
actualIntervalEnd = interval.getEnd();
}
return computeOffsets(actualIntervalStart.getMillis(), 0, actualIntervalEnd.getMillis(), index.timeOffsets.length);
}
private Pair<Integer, Integer> computeOffsets(long startMillis, int startOffset, long endMillis, int endOffset)
{
int startIndex = startOffset;
int endIndex = endOffset;
if (index.timeOffsets[startIndex] < startMillis) {
startIndex = Math.abs(Arrays.binarySearch(index.timeOffsets, startMillis));
if (startIndex >= endOffset) {
return new Pair<Integer, Integer>(0, 0);
}
while (startIndex > 0 && index.timeOffsets[startIndex - 1] == startMillis) {
--startIndex;
}
}
if (index.timeOffsets[endIndex - 1] >= endMillis) {
endIndex = Math.abs(Arrays.binarySearch(index.timeOffsets, endMillis));
while (endIndex > startIndex && index.timeOffsets[endIndex - 1] == endMillis) {
--endIndex;
}
}
return new Pair<Integer, Integer>(startIndex, endIndex);
}
private static class TimestampCheckingOffset implements Offset
{
private final Offset baseOffset;
private final long[] timestamps;
private final long threshold;
public TimestampCheckingOffset(
Offset baseOffset,
long[] timestamps,
long threshold
)
{
this.baseOffset = baseOffset;
this.timestamps = timestamps;
this.threshold = threshold;
}
@Override
public int getOffset()
{
return baseOffset.getOffset();
}
@Override
public Offset clone()
{
return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold);
}
@Override
public boolean withinBounds()
{
return baseOffset.withinBounds() && timestamps[baseOffset.getOffset()] < threshold;
}
@Override
public void increment()
{
baseOffset.increment();
}
}
private static class IndexBasedBitmapIndexSelector implements BitmapIndexSelector
{
private final Index index;
public IndexBasedBitmapIndexSelector(final Index index)
{
this.index = index;
}
@Override
public Indexed<String> getDimensionValues(final String dimension)
{
return new Indexed<String>()
{
private final String[] dimVals = index.reverseDimLookup.get(dimension.toLowerCase());
@Override
public Class<? extends String> getClazz()
{
return String.class;
}
@Override
public int size()
{
return dimVals.length;
}
@Override
public String get(int index)
{
return dimVals[index];
}
@Override
public int indexOf(String value)
{
return Arrays.binarySearch(dimVals, value);
}
@Override
public Iterator<String> iterator()
{
return Arrays.asList(dimVals).iterator();
}
};
}
@Override
public int getNumRows()
{
return index.timeOffsets.length;
}
@Override
public ImmutableConciseSet getConciseInvertedIndex(String dimension, String value)
{
return index.getInvertedIndex(dimension.toLowerCase(), value);
}
}
}

View File

@ -35,6 +35,7 @@ import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.brita.BitmapIndexSelector;
import com.metamx.druid.index.brita.Filter;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ColumnCapabilities;
import com.metamx.druid.index.column.ColumnSelector;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
@ -48,10 +49,12 @@ import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
@ -254,6 +257,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
@ -465,6 +470,134 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumnVals = objectColumnCache.get(columnName);
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
final ColumnCapabilities capabilities = holder.getCapabilities();
if(capabilities.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
if(capabilities.isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(capabilities.getType() == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) {
objectColumnCache.put(columnName, cachedColumnVals);
}
}
if (cachedColumnVals == null) {
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(cursorOffset.getOffset());
}
};
}
if(type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.TYPE;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(cursorOffset.getOffset());
}
};
}
if(type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(cursorOffset.getOffset());
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
}
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(cursorOffset.getOffset());
}
};
}
};
}
}
@ -486,6 +619,11 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
}
for (Object column : complexColumnCache.values()) {
if(column instanceof Closeable) {
Closeables.closeQuietly((Closeable)column);
}
}
}
}
);
@ -562,6 +700,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
{
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
@ -769,6 +909,133 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumnVals = objectColumnCache.get(columnName);
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
if(holder.getCapabilities().hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
final ValueType type = holder.getCapabilities().getType();
if(holder.getCapabilities().isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(type == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) {
objectColumnCache.put(columnName, cachedColumnVals);
}
}
if (cachedColumnVals == null) {
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(currRow);
}
};
}
if(type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.TYPE;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(currRow);
}
};
}
if(type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(currRow);
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
}
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(currRow);
}
};
}
};
}
}
@ -788,6 +1055,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
}
for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column);
}
}
}
);

View File

@ -19,12 +19,12 @@
package com.metamx.druid.index.v1.processing;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import org.joda.time.DateTime;
/**
*/
public interface Cursor extends MetricSelectorFactory, DimensionSelectorFactory
public interface Cursor extends ColumnSelectorFactory, DimensionSelectorFactory
{
public DateTime getTime();
public void advance();

0
some_file.txt Normal file
View File