Merge branch 'master' of github.com:metamx/druid into spatial

Conflicts:
	server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java
	server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java
This commit is contained in:
fjy 2013-04-23 11:40:44 -07:00
commit 70937af7bd
53 changed files with 872 additions and 753 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -278,6 +278,7 @@ public class Initialization
final ServiceInstance serviceInstance =
ServiceInstance.builder()
.name(config.getServiceName().replace('/', ':'))
.address(addressFromHost(config.getHost()))
.port(config.getPort())
.build();
final ServiceDiscovery serviceDiscovery =
@ -361,6 +362,16 @@ public class Initialization
return String.format("%s/%s", basePath, PROP_SUBPATH);
}
public static String addressFromHost(final String host)
{
final int colon = host.indexOf(':');
if (colon < 0) {
return host;
} else {
return host.substring(0, colon);
}
}
/**
* Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.

View File

@ -28,6 +28,9 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig
@Config("druid.service")
public abstract String getServiceName();
@Config("druid.host")
public abstract String getHost();
@Config("druid.port")
public abstract int getPort();

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -21,7 +21,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.util.Comparator;
import java.util.List;
@ -29,7 +29,7 @@ import java.util.List;
/**
* Processing related interface
*
* An AggregatorFactory is an object that knows how to generate an Aggregator using a MetricSelectorFactory.
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
*
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
* without making any assumptions about how they are pulling values out of the base data. That is, the data is
@ -48,8 +48,8 @@ import java.util.List;
})
public interface AggregatorFactory
{
public Aggregator factorize(MetricSelectorFactory metricFactory);
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory);
public Aggregator factorize(ColumnSelectorFactory metricFactory);
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory);
public Comparator getComparator();
/**

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.util.Comparator;
import java.util.List;
@ -44,13 +44,13 @@ public class CountAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new CountAggregator(name);
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new CountBufferAggregator();
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,7 +49,7 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleSumAggregator(
name,
@ -58,7 +58,7 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleSumBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Longs;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import org.apache.commons.codec.binary.Base64;
import java.nio.ByteBuffer;
@ -56,7 +56,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
for(int i = 0; i < breaksList.size(); ++i) this.breaks[i] = breaksList.get(i);
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new HistogramAggregator(
name,
@ -66,7 +66,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new HistogramBufferAggregator(
metricFactory.makeFloatMetricSelector(fieldName),

View File

@ -20,7 +20,7 @@
package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import java.util.List;
@ -28,7 +28,7 @@ public class JavaScriptAggregator implements Aggregator
{
static interface ScriptAggregator
{
public double aggregate(double current, FloatMetricSelector[] selectorList);
public double aggregate(double current, ObjectColumnSelector[] selectorList);
public double combine(double a, double b);
@ -38,15 +38,15 @@ public class JavaScriptAggregator implements Aggregator
}
private final String name;
private final FloatMetricSelector[] selectorList;
private final ObjectColumnSelector[] selectorList;
private final ScriptAggregator script;
private volatile double current;
public JavaScriptAggregator(String name, List<FloatMetricSelector> selectorList, ScriptAggregator script)
public JavaScriptAggregator(String name, List<ObjectColumnSelector> selectorList, ScriptAggregator script)
{
this.name = name;
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{});
this.script = script;
this.current = script.reset();

View File

@ -21,18 +21,16 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import com.metamx.common.IAE;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import com.metamx.druid.processing.ObjectColumnSelector;
import org.mozilla.javascript.Context;
import org.mozilla.javascript.ContextAction;
import org.mozilla.javascript.ContextFactory;
import org.mozilla.javascript.Function;
import org.mozilla.javascript.Script;
import org.mozilla.javascript.ScriptableObject;
import javax.annotation.Nullable;
@ -48,7 +46,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
private final String name;
private final List<String> fieldNames;
private final String script;
private final String fnAggregate;
private final String fnReset;
private final String fnCombine;
private final JavaScriptAggregator.ScriptAggregator compiledScript;
@ -56,26 +58,34 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
public JavaScriptAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldNames") final List<String> fieldNames,
@JsonProperty("script") final String expression
@JsonProperty("fnAggregate") final String fnAggregate,
@JsonProperty("fnReset") final String fnReset,
@JsonProperty("fnCombine") final String fnCombine
)
{
this.name = name;
this.script = expression;
this.fieldNames = fieldNames;
this.compiledScript = compileScript(script);
this.fnAggregate = fnAggregate;
this.fnReset = fnReset;
this.fnCombine = fnCombine;
this.compiledScript = compileScript(fnAggregate, fnReset, fnCombine);
}
@Override
public Aggregator factorize(final MetricSelectorFactory metricFactory)
public Aggregator factorize(final ColumnSelectorFactory columnFactory)
{
return new JavaScriptAggregator(
name,
Lists.transform(
fieldNames,
new com.google.common.base.Function<String, FloatMetricSelector>()
new com.google.common.base.Function<String, ObjectColumnSelector>()
{
@Override
public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); }
public ObjectColumnSelector apply(@Nullable String s) {
return columnFactory.makeObjectColumnSelector(s);
}
}
),
compiledScript
@ -83,17 +93,16 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(final MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory)
{
return new JavaScriptBufferAggregator(
Lists.transform(
fieldNames,
new com.google.common.base.Function<String, FloatMetricSelector>()
new com.google.common.base.Function<String, ObjectColumnSelector>()
{
@Override
public FloatMetricSelector apply(@Nullable String s)
{
return metricFactory.makeFloatMetricSelector(s);
public ObjectColumnSelector apply(@Nullable String s) {
return columnSelectorFactory.makeObjectColumnSelector(s);
}
}
),
@ -116,7 +125,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@Override
public AggregatorFactory getCombiningFactory()
{
throw new UnsupportedOperationException();
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
}
@Override
@ -144,8 +153,21 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
@JsonProperty
public String getScript() {
return script;
public String getFnAggregate()
{
return fnAggregate;
}
@JsonProperty
public String getFnReset()
{
return fnReset;
}
@JsonProperty
public String getFnCombine()
{
return fnCombine;
}
@Override
@ -159,8 +181,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
{
try {
MessageDigest md = MessageDigest.getInstance("SHA-1");
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes();
byte[] sha1 = md.digest(script.getBytes());
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8);
byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8));
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
.put(CACHE_TYPE_ID)
@ -197,21 +219,13 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
return "JavaScriptAggregatorFactory{" +
"name='" + name + '\'' +
", fieldNames=" + fieldNames +
", script='" + script + '\'' +
", fnAggregate='" + fnAggregate + '\'' +
", fnReset='" + fnReset + '\'' +
", fnCombine='" + fnCombine + '\'' +
'}';
}
protected static Function getScriptFunction(String name, ScriptableObject scope)
{
Object fun = scope.get(name, scope);
if (fun instanceof Function) {
return (Function) fun;
} else {
throw new IAE("Function [%s] not defined in script", name);
}
}
public static JavaScriptAggregator.ScriptAggregator compileScript(final String script)
public static JavaScriptAggregator.ScriptAggregator compileScript(final String aggregate, final String reset, final String combine)
{
final ContextFactory contextFactory = ContextFactory.getGlobal();
Context context = contextFactory.enterContext();
@ -219,18 +233,15 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
final ScriptableObject scope = context.initStandardObjects();
Script compiledScript = context.compileString(script, "script", 1, null);
compiledScript.exec(context, scope);
final Function fnAggregate = getScriptFunction("aggregate", scope);
final Function fnReset = getScriptFunction("reset", scope);
final Function fnCombine = getScriptFunction("combine", scope);
final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null);
final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null);
final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null);
Context.exit();
return new JavaScriptAggregator.ScriptAggregator()
{
@Override
public double aggregate(final double current, final FloatMetricSelector[] selectorList)
public double aggregate(final double current, final ObjectColumnSelector[] selectorList)
{
Context cx = Context.getCurrentContext();
if(cx == null) cx = contextFactory.enterContext();

View File

@ -20,22 +20,24 @@
package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import java.nio.ByteBuffer;
import java.util.List;
public class JavaScriptBufferAggregator implements BufferAggregator
{
private final FloatMetricSelector[] selectorList;
private final ObjectColumnSelector[] selectorList;
private final JavaScriptAggregator.ScriptAggregator script;
public JavaScriptBufferAggregator(
List<FloatMetricSelector> selectorList,
List<ObjectColumnSelector> selectorList,
JavaScriptAggregator.ScriptAggregator script
)
{
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{});
this.script = script;
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Longs;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,7 +49,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongSumAggregator(
name,
@ -58,7 +58,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongSumBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,13 +49,13 @@ public class MaxAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new MaxAggregator(name, metricFactory.makeFloatMetricSelector(fieldName));
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new MaxBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -49,13 +49,13 @@ public class MinAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new MinAggregator(name, metricFactory.makeFloatMetricSelector(fieldName));
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new MinBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName));
}

View File

@ -19,7 +19,7 @@
package com.metamx.druid.aggregation;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import java.util.Comparator;
import java.util.List;
@ -36,13 +36,13 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory
}
@Override
public Aggregator factorize(MetricSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return baseAggregatorFactory.factorize(metricFactory);
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return baseAggregatorFactory.factorizeBuffered(metricFactory);
}

View File

@ -71,6 +71,62 @@ public class DbConnector
);
}
public static void createTaskTable(final DBI dbi, final String taskTableName)
{
createTable(
dbi,
taskTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`)\n"
+ ")",
taskTableName
)
);
}
public static void createTaskLogTable(final DBI dbi, final String taskLogsTableName)
{
createTable(
dbi,
taskLogsTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
)
);
}
public static void createTaskLockTable(final DBI dbi, final String taskLocksTableName)
{
createTable(
dbi,
taskLocksTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
)
);
}
public static void createTable(
final DBI dbi,
final String tableName,
@ -125,6 +181,11 @@ public class DbConnector
dataSource.setPassword(config.getDatabasePassword());
dataSource.setUrl(config.getDatabaseConnectURI());
if (config.isValidationQuery()) {
dataSource.setValidationQuery(config.getValidationQuery());
dataSource.setTestOnBorrow(true);
}
return dataSource;
}
}

View File

@ -41,4 +41,16 @@ public abstract class DbConnectorConfig
@JsonProperty("segmentTable")
@Config("druid.database.segmentTable")
public abstract String getSegmentTable();
@JsonProperty("validationQuery")
@Config("druid.database.validation")
public boolean isValidationQuery() {
return false;
}
@JsonProperty("validationQuery")
@Config("druid.database.validationQuery")
public String getValidationQuery() {
return "SELECT 1";
}
}

View File

@ -22,8 +22,9 @@ package com.metamx.druid.processing;
/**
* Factory class for MetricSelectors
*/
public interface MetricSelectorFactory
public interface ColumnSelectorFactory
{
public FloatMetricSelector makeFloatMetricSelector(String metricName);
public ComplexMetricSelector makeComplexMetricSelector(String metricName);
public ObjectColumnSelector makeObjectColumnSelector(String columnName);
}

View File

@ -1,5 +0,0 @@
package com.metamx.druid.processing;
public class MetricSelectorUtils
{
}

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.processing;
public interface ObjectColumnSelector<T>
{
public Class<T> classOfObject();
public T get();
}

View File

@ -20,25 +20,29 @@
package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.FloatMetricSelector;
import com.google.common.collect.Maps;
import com.metamx.druid.processing.ObjectColumnSelector;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
public class JavaScriptAggregatorTest
{
protected static final String sumLogATimesBPlusTen =
"function aggregate(current, a, b) { return current + (Math.log(a) * b) }"
+ "function combine(a,b) { return a + b }"
+ "function reset() { return 10 }";
protected static final Map<String, String> sumLogATimesBPlusTen = Maps.newHashMap();
protected static final Map<String, String> scriptDoubleSum = Maps.newHashMap();
protected static final String scriptDoubleSum =
"function aggregate(current, a) { return current + a }"
+ "function combine(a,b) { return a + b }"
+ "function reset() { return 0 }";
static {
sumLogATimesBPlusTen.put("fnAggregate", "function aggregate(current, a, b) { return current + (Math.log(a) * b) }");
sumLogATimesBPlusTen.put("fnReset", "function reset() { return 10 }");
sumLogATimesBPlusTen.put("fnCombine", "function combine(a,b) { return a + b }");
scriptDoubleSum.put("fnAggregate", "function aggregate(current, a) { return current + a }");
scriptDoubleSum.put("fnReset", "function reset() { return 0 }");
scriptDoubleSum.put("fnCombine", "function combine(a,b) { return a + b }");
}
private static void aggregate(TestFloatMetricSelector selector1, TestFloatMetricSelector selector2, Aggregator agg)
{
@ -69,10 +73,14 @@ public class JavaScriptAggregatorTest
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
Map<String, String> script = sumLogATimesBPlusTen;
JavaScriptAggregator agg = new JavaScriptAggregator(
"billy",
Arrays.<FloatMetricSelector>asList(selector1, selector2),
JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen)
Arrays.<ObjectColumnSelector>asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)),
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine"))
);
agg.reset();
@ -103,9 +111,12 @@ public class JavaScriptAggregatorTest
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
Map<String, String> script = sumLogATimesBPlusTen;
JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator(
Arrays.<FloatMetricSelector>asList(selector1, selector2),
JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen)
Arrays.<ObjectColumnSelector>asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)),
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine"))
);
ByteBuffer buf = ByteBuffer.allocateDirect(32);
@ -150,10 +161,13 @@ public class JavaScriptAggregatorTest
}
*/
Map<String, String> script = scriptDoubleSum;
JavaScriptAggregator aggRhino = new JavaScriptAggregator(
"billy",
Lists.asList(selector, new FloatMetricSelector[]{}),
JavaScriptAggregatorFactory.compileScript(scriptDoubleSum)
Lists.asList(MetricSelectorUtils.wrap(selector), new ObjectColumnSelector[]{}),
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine"))
);
DoubleSumAggregator doubleAgg = new DoubleSumAggregator("billy", selector);

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.aggregation;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
public class MetricSelectorUtils
{
public static ObjectColumnSelector<Float> wrap(final FloatMetricSelector selector)
{
return new ObjectColumnSelector<Float>()
{
@Override
public Class<Float> classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return selector.get();
}
};
}
}

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<modules>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -37,15 +37,21 @@ import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.Aggregator;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
import com.metamx.druid.index.column.GenericColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.serde.ComplexMetricExtractor;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.processing.ColumnSelectorFactory;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ObjectColumnSelector;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -179,7 +185,7 @@ public class IncrementalIndex implements Iterable<Row>
for (int i = 0; i < metrics.length; ++i) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
new MetricSelectorFactory()
new ColumnSelectorFactory()
{
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
@ -199,6 +205,7 @@ public class IncrementalIndex implements Iterable<Row>
public ComplexMetricSelector makeComplexMetricSelector(final String metric)
{
final String typeName = agg.getTypeName();
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
@ -223,6 +230,53 @@ public class IncrementalIndex implements Iterable<Row>
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if(typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
}
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
}
}
);
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -40,8 +40,10 @@ import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose;
import com.metamx.druid.realtime.MinTimeFirehose;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
@ -62,19 +64,22 @@ public class RealtimeIndexTask extends AbstractTask
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
@JsonIgnore
final Schema schema;
private final Schema schema;
@JsonIgnore
final FirehoseFactory firehoseFactory;
private final FirehoseFactory firehoseFactory;
@JsonIgnore
final FireDepartmentConfig fireDepartmentConfig;
private final FireDepartmentConfig fireDepartmentConfig;
@JsonIgnore
final Period windowPeriod;
private final Period windowPeriod;
@JsonIgnore
final IndexGranularity segmentGranularity;
private final IndexGranularity segmentGranularity;
@JsonIgnore
private final DateTime minTime;
@JsonIgnore
private volatile Plumber plumber = null;
@ -95,7 +100,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename?
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("minTime") DateTime minTime
)
{
super(
@ -116,6 +122,7 @@ public class RealtimeIndexTask extends AbstractTask
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity;
this.minTime = minTime;
}
@Override
@ -156,7 +163,19 @@ public class RealtimeIndexTask extends AbstractTask
if (shutdown) {
return TaskStatus.success(getId());
}
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
Firehose wrappedFirehose = firehoseFactory.connect();
if (minTime != null) {
log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime);
wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime);
}
log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity,
windowPeriod
);
firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod);
}
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
@ -347,6 +366,12 @@ public class RealtimeIndexTask extends AbstractTask
return segmentGranularity;
}
@JsonProperty
public DateTime getMinTime()
{
return minTime;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;

View File

@ -82,15 +82,16 @@ public class DbTaskStorage implements TaskStorage
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, payload, status_code, status_payload) VALUES (:id, :created_date, :payload, :status_code, :status_payload)",
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
dbConnectorConfig.getTaskTable()
)
)
.bind("id", task.getId())
.bind("created_date", new DateTime().toString())
.bind("payload", jsonMapper.writeValueAsString(task))
.bind("status_code", status.getStatusCode().toString())
.bind("status_payload", jsonMapper.writeValueAsString(status))
.bind("datasource", task.getDataSource())
.bind("payload", jsonMapper.writeValueAsBytes(task))
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
@ -122,21 +123,20 @@ public class DbTaskStorage implements TaskStorage
{
return handle.createStatement(
String.format(
"UPDATE %s SET status_code = :status_code, status_payload = :status_payload WHERE id = :id AND status_code = :old_status_code",
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
dbConnectorConfig.getTaskTable()
)
)
.bind("id", status.getId())
.bind("status_code", status.getStatusCode().toString())
.bind("old_status_code", TaskStatus.Status.RUNNING.toString())
.bind("status_payload", jsonMapper.writeValueAsString(status))
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
}
}
);
if(updated != 1) {
throw new IllegalStateException(String.format("Running task not found: %s", status.getId()));
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
@ -163,7 +163,7 @@ public class DbTaskStorage implements TaskStorage
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue(dbStatus.get("payload").toString(), Task.class));
return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("payload"), Task.class));
}
}
}
@ -193,7 +193,7 @@ public class DbTaskStorage implements TaskStorage
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue(dbStatus.get("status_payload").toString(), TaskStatus.class));
return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("status_payload"), TaskStatus.class));
}
}
}
@ -201,34 +201,40 @@ public class DbTaskStorage implements TaskStorage
}
@Override
public List<String> getRunningTaskIds()
public List<Task> getRunningTasks()
{
return dbi.withHandle(
new HandleCallback<List<String>>()
new HandleCallback<List<Task>>()
{
@Override
public List<String> withHandle(Handle handle) throws Exception
public List<Task> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id FROM %s WHERE status_code = :status_code",
"SELECT id, payload, status_payload FROM %s WHERE active = 1",
dbConnectorConfig.getTaskTable()
)
)
.bind("status_code", TaskStatus.Status.RUNNING.toString())
.list();
return Lists.transform(
dbTasks, new Function<Map<String, Object>, String>()
{
@Override
public String apply(Map<String, Object> row)
{
return row.get("id").toString();
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final Task task = jsonMapper.readValue((byte[])row.get("payload"), Task.class);
final TaskStatus status = jsonMapper.readValue((byte[])row.get("status_payload"), TaskStatus.class);
if (status.isRunnable()) {
tasks.add(task);
}
} catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
}
}
);
return tasks.build();
}
}
);
@ -260,7 +266,7 @@ public class DbTaskStorage implements TaskStorage
)
)
.bind("task_id", taskid)
.bind("lock_payload", jsonMapper.writeValueAsString(taskLock))
.bind("lock_payload", jsonMapper.writeValueAsBytes(taskLock))
.execute();
}
}
@ -340,7 +346,7 @@ public class DbTaskStorage implements TaskStorage
)
)
.bind("task_id", task.getId())
.bind("log_payload", jsonMapper.writeValueAsString(taskAction))
.bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
.execute();
}
}
@ -373,7 +379,7 @@ public class DbTaskStorage implements TaskStorage
public TaskAction apply(Map<String, Object> row)
{
try {
return jsonMapper.readValue(row.get("log_payload").toString(), TaskAction.class);
return jsonMapper.readValue((byte[])row.get("log_payload"), TaskAction.class);
} catch(Exception e) {
throw Throwables.propagate(e);
}
@ -405,7 +411,7 @@ public class DbTaskStorage implements TaskStorage
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for(final Map<String, Object> row : dbTaskLocks) {
retMap.put((Long)row.get("id"), jsonMapper.readValue(row.get("lock_payload").toString(), TaskLock.class));
retMap.put((Long)row.get("id"), jsonMapper.readValue((byte[])row.get("lock_payload"), TaskLock.class));
}
return retMap;
}

View File

@ -23,6 +23,8 @@ import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -81,7 +83,28 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
return exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, null, new DateTime());
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(TaskStatus result)
{
runningItems.remove(taskRunnerWorkItem);
}
@Override
public void onFailure(Throwable t)
{
runningItems.remove(taskRunnerWorkItem);
}
}
);
return statusFuture;
}
@Override
@ -97,34 +120,13 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return runningItems;
return ImmutableList.copyOf(runningItems);
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
return Lists.newArrayList(
FunctionalIterable.create(tpe.getQueue())
.keep(
new Function<Runnable, TaskRunnerWorkItem>()
{
@Override
public TaskRunnerWorkItem apply(Runnable input)
{
if (input instanceof ExecutorServiceTaskRunnerCallable) {
return ((ExecutorServiceTaskRunnerCallable) input).getTaskRunnerWorkItem();
}
return null;
}
}
)
);
}
return Lists.newArrayList();
return ImmutableList.of();
}
@Override

View File

@ -125,11 +125,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
// time to adjust process holders
synchronized (tasks) {
if (Thread.interrupted()) {
throw new InterruptedException();
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
@ -159,7 +160,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
System.getProperty(propName)
props.getProperty(propName)
)
);
}
@ -227,11 +228,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
return TaskStatus.failure(task.getId());
}
}
catch (InterruptedException e) {
log.info(e, "Interrupted during execution");
return TaskStatus.failure(task.getId());
}
catch (IOException e) {
catch (Exception e) {
log.info(e, "Exception caught during execution");
throw Throwables.propagate(e);
}
finally {
@ -288,9 +286,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
log.info("Ignoring request to cancel unknown task: %s", taskid);
return;
}
}
taskInfo.statusFuture.cancel(true);
taskInfo.shutdown = true;
}
if (taskInfo.processHolder != null) {
final int shutdowns = taskInfo.processHolder.shutdowns.getAndIncrement();
@ -397,6 +395,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private static class TaskInfo
{
private final ListenableFuture<TaskStatus> statusFuture;
private volatile boolean shutdown = false;
private volatile ProcessHolder processHolder = null;
private TaskInfo(ListenableFuture<TaskStatus> statusFuture)

View File

@ -128,15 +128,15 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public List<String> getRunningTaskIds()
public List<Task> getRunningTasks()
{
giant.lock();
try {
final ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
for(final TaskStuff taskStuff : tasks.values()) {
if(taskStuff.getStatus().isRunnable()) {
listBuilder.add(taskStuff.getTask().getId());
listBuilder.add(taskStuff.getTask());
}
}

View File

@ -573,9 +573,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (taskStatus.isComplete()) {
if (taskRunnerWorkItem != null) {
final SettableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
result.set(taskStatus);
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
}

View File

@ -98,10 +98,8 @@ public class TaskQueue
// Get all running tasks and their locks
final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create();
for (final String taskId : taskStorage.getRunningTaskIds()) {
for (final Task task : taskStorage.getRunningTasks()) {
try {
// .get since TaskStorage semantics should mean this task is always found
final Task task = taskStorage.getTask(taskId).get();
final List<TaskLock> taskLocks = taskStorage.getLocks(task.getId());
queue.add(task);
@ -111,16 +109,8 @@ public class TaskQueue
}
}
catch (Exception e) {
log.makeAlert("Failed to bootstrap task").addData("task", taskId).emit();
// A bit goofy to special-case JsonProcessingException, but we don't want to suppress bootstrap problems on
// any old Exception or even IOException...
if (e instanceof JsonProcessingException || e.getCause() instanceof JsonProcessingException) {
// Mark this task a failure, and continue bootstrapping
taskStorage.setStatus(TaskStatus.failure(taskId));
} else {
throw Throwables.propagate(e);
}
log.makeAlert("Failed to bootstrap task").addData("task", task.getId()).emit();
throw Throwables.propagate(e);
}
}

View File

@ -20,6 +20,9 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskStatus;
@ -33,7 +36,7 @@ import org.joda.time.DateTimeComparator;
public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{
private final Task task;
private final SettableFuture<TaskStatus> result;
private final ListenableFuture<TaskStatus> result;
private final RetryPolicy retryPolicy;
private final DateTime createdTime;
@ -41,7 +44,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result,
ListenableFuture<TaskStatus> result,
RetryPolicy retryPolicy,
DateTime createdTime
)
@ -58,7 +61,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return task;
}
public SettableFuture<TaskStatus> getResult()
public ListenableFuture<TaskStatus> getResult()
{
return result;
}
@ -89,7 +92,10 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
@Override
public int compareTo(TaskRunnerWorkItem taskRunnerWorkItem)
{
return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime());
return ComparisonChain.start()
.compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance())
.compare(task.getId(), taskRunnerWorkItem.getTask().getId())
.result();
}
@Override

View File

@ -77,9 +77,9 @@ public interface TaskStorage
public List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently-running task IDs as stored in the storage facility, in no particular order.
* Returns a list of currently-running tasks as stored in the storage facility, in no particular order.
*/
public List<String> getRunningTaskIds();
public List<Task> getRunningTasks();
/**
* Returns a list of locks for a particular task.

View File

@ -606,6 +606,10 @@ public class IndexerCoordinatorNode extends RegisteringNode
taskStorage = new HeapMemoryTaskStorage();
} else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
DbConnector.createTaskTable(dbi, dbConnectorConfig.getTaskTable());
DbConnector.createTaskLogTable(dbi, dbConnectorConfig.getTaskLogTable());
DbConnector.createTaskLockTable(dbi, dbConnectorConfig.getTaskLockTable());
taskStorage = new DbTaskStorage(
getJsonMapper(),
dbConnectorConfig,

View File

@ -30,11 +30,11 @@ import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionHolder;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;

View File

@ -205,7 +205,8 @@ public class TaskSerdeTest
null,
null,
new Period("PT10M"),
IndexGranularity.HOUR
IndexGranularity.HOUR,
null
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

Binary file not shown.

View File

@ -123,7 +123,7 @@ core engine of the Metamarkets data analytics platform. In this paper, we detail
\section{Introduction}
Enterprises routinely collect diverse data sets that can contain up to terabytes of information per day. Companies are increasingly realizing the importance of efficiently storing and analyzing this data in order to increase both productivity and profitability. Numerous database systems (e.g., IBMs Netezza \cite{singh2011introduction}, HP's Vertica \cite{bear2012vertica}, EMCs Greenplum \cite{miner2012unified}) and several research papers \cite{barroso2009datacenter, chaudhuri1997overview, dewitt1992parallel} offer solutions for how to store and extract information from large data sets. However, many of these Relational Database Management Systems (RDBMS) and NoSQL architectures do not support interactive queries and real-time data ingestion.
Metamarkets built Druid to directly address the need for a real-time analytical data store in the big-data ecosystem. Druid shares some similarities with main-memory databases \cite{farber2012sap} and interactive query systems such as Dremel \cite{melnik2010dremel} and PowerDrill \cite{hall2012processing}. Druid's focus is fast aggregations, arbitrarily deep data exploration, and low-latency data ingestion. Furthermore, Druid is highly configurable and allows users to easily adjust fault tolerance and performance properties. Queries on in-memory data typically complete in millseconds, and real-time data ingestion means that new events are immediately available for analysis.
Metamarkets built Druid to directly address the need for a real-time analytical data store in the big-data ecosystem. Druid shares some similarities with main-memory databases \cite{farber2012sap} and interactive query systems such as Dremel \cite{melnik2010dremel} and PowerDrill \cite{hall2012processing}. Druid's focus is fast aggregations, arbitrarily deep data exploration, and low-latency data ingestion. Furthermore, Druid is highly configurable and allows users to easily adjust fault tolerance and performance properties. Queries on in-memory data typically complete in milliseconds, and real-time data ingestion means that new events are immediately available for analysis.
In this paper, we make the following contributions:
\begin{itemize}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,68 @@
package com.metamx.druid.realtime;
import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.NoSuchElementException;
/**
* Provides a view on a firehose that only returns rows at or after a certain minimum timestamp.
* Not thread-safe.
*/
public class MinTimeFirehose implements Firehose
{
private final Firehose firehose;
private final DateTime minTime;
private InputRow savedInputRow = null;
public MinTimeFirehose(Firehose firehose, DateTime minTime)
{
this.firehose = firehose;
this.minTime = minTime;
}
@Override
public boolean hasMore()
{
if (savedInputRow != null) {
return true;
}
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
if (acceptable(row)) {
savedInputRow = row;
return true;
}
}
return false;
}
@Override
public InputRow nextRow()
{
final InputRow row = savedInputRow;
savedInputRow = null;
return row;
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
firehose.close();
}
private boolean acceptable(InputRow row)
{
return row.getTimestampFromEpoch() >= minTime.getMillis();
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.3.36-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -43,6 +43,7 @@ import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import com.metamx.druid.query.search.SearchHit;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQuerySpec;
@ -359,6 +360,61 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
final Integer metricIndexInt = index.getMetricIndex(columnName);
if(metricIndexInt != null) {
final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName));
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return serde.getObjectStrategy().getClazz();
}
@Override
public Object get()
{
return currEntry.getValue()[metricIndex].get();
}
};
}
final Integer dimensionIndexInt = index.getDimensionIndex(columnName);
if(dimensionIndexInt != null) {
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
if(dimVals.length == 1) return dimVals[0];
if(dimVals.length == 0) return null;
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
};
}
return null;
}
};
}
}

View File

@ -1,542 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.Pair;
import com.metamx.common.collect.MoreIterators;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.logger.Logger;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.Capabilities;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.index.brita.BitmapIndexSelector;
import com.metamx.druid.index.brita.Filter;
import com.metamx.druid.index.v1.processing.ArrayBasedOffset;
import com.metamx.druid.index.v1.processing.Cursor;
import com.metamx.druid.index.v1.processing.DimensionSelector;
import com.metamx.druid.index.v1.processing.Offset;
import com.metamx.druid.index.v1.processing.StartLimitedOffset;
import com.metamx.druid.kv.ArrayBasedIndexedInts;
import com.metamx.druid.kv.ArrayIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedFloats;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.ListIndexed;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
/**
*/
public class IndexStorageAdapter extends BaseStorageAdapter
{
private final Logger log = new Logger(IndexStorageAdapter.class);
private final Index index;
private final int[] ids;
private final Capabilities capabilities;
public IndexStorageAdapter(
Index index
)
{
this.index = index;
capabilities = Capabilities.builder()
.dimensionValuesSorted(isReverseDimSorted())
.build();
ids = new int[index.timeOffsets.length];
for (int i = 0; i < ids.length; i++) {
ids[i] = i;
}
}
@Override
public String getSegmentIdentifier()
{
throw new UnsupportedOperationException();
}
@Override
public Interval getInterval()
{
return index.dataInterval;
}
@Override
public int getDimensionCardinality(String dimension)
{
final String[] strings = index.reverseDimLookup.get(dimension);
return strings == null ? 0 : strings.length;
}
@Override
public DateTime getMinTime()
{
return new DateTime(index.timeOffsets[0]);
}
@Override
public DateTime getMaxTime()
{
return new DateTime(index.timeOffsets[index.timeOffsets.length - 1]);
}
@Override
public Iterable<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran)
{
Interval actualIntervalTmp = interval;
if (!actualIntervalTmp.overlaps(index.dataInterval)) {
return ImmutableList.of();
}
if (actualIntervalTmp.getStart().isBefore(index.dataInterval.getStart())) {
actualIntervalTmp = actualIntervalTmp.withStart(index.dataInterval.getStart());
}
if (actualIntervalTmp.getEnd().isAfter(index.dataInterval.getEnd())) {
actualIntervalTmp = actualIntervalTmp.withEnd(index.dataInterval.getEnd());
}
final Interval actualInterval = actualIntervalTmp;
final Pair<Integer, Integer> intervalStartAndEnd = computeTimeStartEnd(actualInterval);
return new Iterable<Cursor>()
{
@Override
public Iterator<Cursor> iterator()
{
final Offset baseOffset;
if (filter == null) {
baseOffset = new ArrayBasedOffset(ids, intervalStartAndEnd.lhs);
} else {
baseOffset = new StartLimitedOffset(
new ConciseOffset(filter.goConcise(new IndexBasedBitmapIndexSelector(index))),
intervalStartAndEnd.lhs
);
}
final Map<String, Object> metricHolderCache = Maps.newHashMap();
// This after call is not perfect, if there is an exception during processing, it will never get called,
// but it's better than nothing and doing this properly all the time requires a lot more fixerating
return MoreIterators.after(
FunctionalIterator
.create(gran.iterable(actualInterval.getStartMillis(), actualInterval.getEndMillis()).iterator())
.keep(
new Function<Long, Cursor>()
{
@Override
public Cursor apply(final Long intervalStart)
{
final Offset offset = new TimestampCheckingOffset(
baseOffset,
index.timeOffsets,
Math.min(actualInterval.getEndMillis(), gran.next(intervalStart))
);
return new Cursor()
{
private final Offset initOffset = offset.clone();
private Offset cursorOffset = offset;
private final DateTime timestamp = gran.toDateTime(intervalStart);
@Override
public DateTime getTime()
{
return timestamp;
}
@Override
public void advance()
{
cursorOffset.increment();
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public void reset()
{
cursorOffset = initOffset.clone();
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
final String dimensionName = dimension.toLowerCase();
final String[] nameLookup = index.reverseDimLookup.get(dimensionName);
if (nameLookup == null) {
return null;
}
return new DimensionSelector()
{
final Map<String, Integer> dimValLookup = index.dimIdLookup.get(dimensionName);
final DimensionColumn dimColumn = index.dimensionValues.get(dimensionName);
final int[][] dimensionExpansions = dimColumn.getDimensionExpansions();
final int[] dimensionRowValues = dimColumn.getDimensionRowValues();
@Override
public IndexedInts getRow()
{
return new ArrayBasedIndexedInts(dimensionExpansions[dimensionRowValues[cursorOffset.getOffset()]]);
}
@Override
public int getValueCardinality()
{
return nameLookup.length;
}
@Override
public String lookupName(int id)
{
return nameLookup[id];
}
@Override
public int lookupId(String name)
{
final Integer retVal = dimValLookup.get(name);
return retVal == null ? -1 : retVal;
}
};
}
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
{
String metricName = metric.toLowerCase();
IndexedFloats cachedFloats = (IndexedFloats) metricHolderCache.get(metric);
if (cachedFloats == null) {
MetricHolder holder = index.metricVals.get(metricName);
if (holder == null) {
return new FloatMetricSelector()
{
@Override
public float get()
{
return 0.0f;
}
};
}
cachedFloats = holder.getFloatType();
metricHolderCache.put(metricName, cachedFloats);
}
final IndexedFloats metricVals = cachedFloats;
return new FloatMetricSelector()
{
@Override
public float get()
{
return metricVals.get(cursorOffset.getOffset());
}
};
}
@Override
public ComplexMetricSelector makeComplexMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
Indexed cachedComplex = (Indexed) metricHolderCache.get(metricName);
if (cachedComplex == null) {
MetricHolder holder = index.metricVals.get(metricName);
if (holder != null) {
cachedComplex = holder.getComplexType();
metricHolderCache.put(metricName, cachedComplex);
}
}
if (cachedComplex == null) {
return null;
}
final Indexed vals = cachedComplex;
return new ComplexMetricSelector()
{
@Override
public Class classOfObject()
{
return vals.getClazz();
}
@Override
public Object get()
{
return vals.get(cursorOffset.getOffset());
}
};
}
};
}
}
),
new Runnable()
{
@Override
public void run()
{
for (Object object : metricHolderCache.values()) {
if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object);
}
}
}
}
);
}
};
}
@Override
public Indexed<String> getAvailableDimensions()
{
return new ArrayIndexed<String>(index.dimensions, String.class);
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
return new ListIndexed<String>(
Lists.newArrayList(index.dimIdLookup.get(dimension.toLowerCase()).keySet()), String.class
);
}
@Override
public ImmutableConciseSet getInvertedIndex(String dimension, String dimVal)
{
return index.getInvertedIndex(dimension.toLowerCase(), dimVal);
}
@Override
public ImmutableConciseSet getInvertedIndex(String dimension, int idx)
{
return index.getInvertedIndex(dimension.toLowerCase(), idx);
}
@Override
public Offset getFilterOffset(Filter filter)
{
return new ConciseOffset(filter.goConcise(new IndexBasedBitmapIndexSelector(index)));
}
@Override
public Capabilities getCapabilities()
{
return capabilities;
}
private boolean isReverseDimSorted()
{
for (Map.Entry<String, String[]> entry : index.reverseDimLookup.entrySet()) {
String[] arr = entry.getValue();
for (int i = 0; i < arr.length - 1; i++) {
if (arr[i].compareTo(arr[i + 1]) > 0) {
return false;
}
}
}
return true;
}
private Pair<Integer, Integer> computeTimeStartEnd(Interval interval)
{
DateTime actualIntervalStart = index.dataInterval.getStart();
DateTime actualIntervalEnd = index.dataInterval.getEnd();
if (index.dataInterval.contains(interval.getStart())) {
actualIntervalStart = interval.getStart();
}
if (index.dataInterval.contains(interval.getEnd())) {
actualIntervalEnd = interval.getEnd();
}
return computeOffsets(actualIntervalStart.getMillis(), 0, actualIntervalEnd.getMillis(), index.timeOffsets.length);
}
private Pair<Integer, Integer> computeOffsets(long startMillis, int startOffset, long endMillis, int endOffset)
{
int startIndex = startOffset;
int endIndex = endOffset;
if (index.timeOffsets[startIndex] < startMillis) {
startIndex = Math.abs(Arrays.binarySearch(index.timeOffsets, startMillis));
if (startIndex >= endOffset) {
return new Pair<Integer, Integer>(0, 0);
}
while (startIndex > 0 && index.timeOffsets[startIndex - 1] == startMillis) {
--startIndex;
}
}
if (index.timeOffsets[endIndex - 1] >= endMillis) {
endIndex = Math.abs(Arrays.binarySearch(index.timeOffsets, endMillis));
while (endIndex > startIndex && index.timeOffsets[endIndex - 1] == endMillis) {
--endIndex;
}
}
return new Pair<Integer, Integer>(startIndex, endIndex);
}
private static class TimestampCheckingOffset implements Offset
{
private final Offset baseOffset;
private final long[] timestamps;
private final long threshold;
public TimestampCheckingOffset(
Offset baseOffset,
long[] timestamps,
long threshold
)
{
this.baseOffset = baseOffset;
this.timestamps = timestamps;
this.threshold = threshold;
}
@Override
public int getOffset()
{
return baseOffset.getOffset();
}
@Override
public Offset clone()
{
return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold);
}
@Override
public boolean withinBounds()
{
return baseOffset.withinBounds() && timestamps[baseOffset.getOffset()] < threshold;
}
@Override
public void increment()
{
baseOffset.increment();
}
}
private static class IndexBasedBitmapIndexSelector implements BitmapIndexSelector
{
private final Index index;
public IndexBasedBitmapIndexSelector(final Index index)
{
this.index = index;
}
@Override
public Indexed<String> getDimensionValues(final String dimension)
{
return new Indexed<String>()
{
private final String[] dimVals = index.reverseDimLookup.get(dimension.toLowerCase());
@Override
public Class<? extends String> getClazz()
{
return String.class;
}
@Override
public int size()
{
return dimVals.length;
}
@Override
public String get(int index)
{
return dimVals[index];
}
@Override
public int indexOf(String value)
{
return Arrays.binarySearch(dimVals, value);
}
@Override
public Iterator<String> iterator()
{
return Arrays.asList(dimVals).iterator();
}
};
}
@Override
public int getNumRows()
{
return index.timeOffsets.length;
}
@Override
public ImmutableConciseSet getConciseInvertedIndex(String dimension, String value)
{
return index.getInvertedIndex(dimension.toLowerCase(), value);
}
@Override
public ImmutableConciseSet getConciseInvertedIndex(String dimension, int idx)
{
return index.getInvertedIndex(dimension.toLowerCase(), idx);
}
@Override
public ImmutableRTree getSpatialIndex(String dimension)
{
return index.getSpatialIndex(dimension.toLowerCase());
}
}
}

View File

@ -36,6 +36,7 @@ import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.brita.BitmapIndexSelector;
import com.metamx.druid.index.brita.Filter;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ColumnCapabilities;
import com.metamx.druid.index.column.ColumnSelector;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
@ -49,11 +50,12 @@ import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.nio.ByteBuffer;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
@ -280,6 +282,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
@ -490,6 +494,134 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumnVals = objectColumnCache.get(columnName);
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
final ColumnCapabilities capabilities = holder.getCapabilities();
if(capabilities.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
if(capabilities.isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(capabilities.getType() == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) {
objectColumnCache.put(columnName, cachedColumnVals);
}
}
if (cachedColumnVals == null) {
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(cursorOffset.getOffset());
}
};
}
if(type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.TYPE;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(cursorOffset.getOffset());
}
};
}
if(type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(cursorOffset.getOffset());
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
}
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(cursorOffset.getOffset());
}
};
}
};
}
}
@ -511,6 +643,11 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
}
for (Object column : complexColumnCache.values()) {
if(column instanceof Closeable) {
Closeables.closeQuietly((Closeable)column);
}
}
}
}
);
@ -587,6 +724,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
{
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
@ -793,6 +932,133 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumnVals = objectColumnCache.get(columnName);
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
if(holder.getCapabilities().hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
final ValueType type = holder.getCapabilities().getType();
if(holder.getCapabilities().isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(type == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) {
objectColumnCache.put(columnName, cachedColumnVals);
}
}
if (cachedColumnVals == null) {
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(currRow);
}
};
}
if(type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.TYPE;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(currRow);
}
};
}
if(type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(currRow);
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
}
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(currRow);
}
};
}
};
}
}
@ -812,6 +1078,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
}
for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column);
}
}
}
);

View File

@ -19,12 +19,12 @@
package com.metamx.druid.index.v1.processing;
import com.metamx.druid.processing.MetricSelectorFactory;
import com.metamx.druid.processing.ColumnSelectorFactory;
import org.joda.time.DateTime;
/**
*/
public interface Cursor extends MetricSelectorFactory, DimensionSelectorFactory
public interface Cursor extends ColumnSelectorFactory, DimensionSelectorFactory
{
public DateTime getTime();
public void advance();

0
some_file.txt Normal file
View File