mirror of https://github.com/apache/druid.git
bunch of issue fixes
This commit is contained in:
parent
535e254382
commit
15a4dae2b1
|
@ -43,6 +43,7 @@ import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
|
||||||
import com.metamx.druid.coordination.DataSegmentAnnouncer;
|
import com.metamx.druid.coordination.DataSegmentAnnouncer;
|
||||||
import com.metamx.druid.coordination.DruidServerMetadata;
|
import com.metamx.druid.coordination.DruidServerMetadata;
|
||||||
import com.metamx.druid.curator.announcement.Announcer;
|
import com.metamx.druid.curator.announcement.Announcer;
|
||||||
|
import com.metamx.druid.http.NoopRequestLogger;
|
||||||
import com.metamx.druid.http.RequestLogger;
|
import com.metamx.druid.http.RequestLogger;
|
||||||
import com.metamx.druid.initialization.CuratorConfig;
|
import com.metamx.druid.initialization.CuratorConfig;
|
||||||
import com.metamx.druid.initialization.Initialization;
|
import com.metamx.druid.initialization.Initialization;
|
||||||
|
@ -374,12 +375,14 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
|
||||||
getEmitter()
|
getEmitter()
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
else {
|
else if ("file".equalsIgnoreCase(loggingType)) {
|
||||||
setRequestLogger(Initialization.makeFileRequestLogger(
|
setRequestLogger(Initialization.makeFileRequestLogger(
|
||||||
getJsonMapper(),
|
getJsonMapper(),
|
||||||
getScheduledExecutorFactory(),
|
getScheduledExecutorFactory(),
|
||||||
getProps()
|
getProps()
|
||||||
));
|
));
|
||||||
|
} else {
|
||||||
|
setRequestLogger(new NoopRequestLogger());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.http;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class NoopRequestLogger implements RequestLogger
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void log(RequestLogLine requestLogLine) throws Exception
|
||||||
|
{
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
|
@ -46,13 +46,16 @@ import org.joda.time.Minutes;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
|
public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
|
||||||
{
|
{
|
||||||
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){};
|
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>()
|
||||||
|
{
|
||||||
|
};
|
||||||
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
|
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -228,6 +231,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
||||||
{
|
{
|
||||||
return left.getId().compareTo(right.getId());
|
return left.getId().compareTo(right.getId());
|
||||||
}
|
}
|
||||||
};
|
}.nullsFirst();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
@ -40,6 +41,8 @@ public class CountAggregatorFactory implements AggregatorFactory
|
||||||
@JsonProperty("name") String name
|
@JsonProperty("name") String name
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package com.metamx.druid.aggregation;
|
package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
import com.google.common.primitives.Doubles;
|
import com.google.common.primitives.Doubles;
|
||||||
import com.metamx.druid.processing.FloatMetricSelector;
|
import com.metamx.druid.processing.FloatMetricSelector;
|
||||||
|
|
||||||
|
@ -28,14 +29,14 @@ import java.util.Comparator;
|
||||||
*/
|
*/
|
||||||
public class DoubleSumAggregator implements Aggregator
|
public class DoubleSumAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
static final Comparator COMPARATOR = new Comparator()
|
static final Comparator COMPARATOR = new Ordering()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public int compare(Object o, Object o1)
|
public int compare(Object o, Object o1)
|
||||||
{
|
{
|
||||||
return Doubles.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue());
|
return Doubles.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue());
|
||||||
}
|
}
|
||||||
};
|
}.nullsFirst();
|
||||||
|
|
||||||
static double combineValues(Object lhs, Object rhs)
|
static double combineValues(Object lhs, Object rhs)
|
||||||
{
|
{
|
||||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.primitives.Doubles;
|
import com.google.common.primitives.Doubles;
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
|
||||||
|
@ -44,6 +45,9 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
|
||||||
@JsonProperty("fieldName") final String fieldName
|
@JsonProperty("fieldName") final String fieldName
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name");
|
||||||
|
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldName = fieldName;
|
this.fieldName = fieldName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,8 @@ package com.metamx.druid.aggregation;
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.primitives.Floats;
|
import com.google.common.primitives.Floats;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
@ -49,12 +51,18 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
||||||
@JsonProperty("breaks") final List<Float> breaksList
|
@JsonProperty("breaks") final List<Float> breaksList
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name");
|
||||||
|
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldName = fieldName;
|
this.fieldName = fieldName;
|
||||||
this.breaksList = breaksList;
|
this.breaksList = (breaksList == null) ? Lists.<Float>newArrayList() :breaksList;
|
||||||
this.breaks = new float[breaksList.size()];
|
this.breaks = new float[this.breaksList.size()];
|
||||||
for(int i = 0; i < breaksList.size(); ++i) this.breaks[i] = breaksList.get(i);
|
for (int i = 0; i < this.breaksList.size(); ++i) {
|
||||||
|
this.breaks[i] = this.breaksList.get(i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||||
{
|
{
|
||||||
|
@ -95,14 +103,12 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
||||||
@Override
|
@Override
|
||||||
public Object deserialize(Object object)
|
public Object deserialize(Object object)
|
||||||
{
|
{
|
||||||
if (object instanceof byte []) {
|
if (object instanceof byte[]) {
|
||||||
return Histogram.fromBytes((byte []) object);
|
return Histogram.fromBytes((byte[]) object);
|
||||||
}
|
} else if (object instanceof ByteBuffer) {
|
||||||
else if (object instanceof ByteBuffer) {
|
|
||||||
return Histogram.fromBytes((ByteBuffer) object);
|
return Histogram.fromBytes((ByteBuffer) object);
|
||||||
}
|
} else if (object instanceof String) {
|
||||||
else if(object instanceof String) {
|
byte[] bytes = Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8));
|
||||||
byte[] bytes = Base64.decodeBase64(((String)object).getBytes(Charsets.UTF_8));
|
|
||||||
return Histogram.fromBytes(bytes);
|
return Histogram.fromBytes(bytes);
|
||||||
}
|
}
|
||||||
return object;
|
return object;
|
||||||
|
@ -111,7 +117,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
||||||
@Override
|
@Override
|
||||||
public Object finalizeComputation(Object object)
|
public Object finalizeComputation(Object object)
|
||||||
{
|
{
|
||||||
return ((Histogram)object).asVisual();
|
return ((Histogram) object).asVisual();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -149,7 +155,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
||||||
@Override
|
@Override
|
||||||
public String getTypeName()
|
public String getTypeName()
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()");
|
throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.primitives.Doubles;
|
import com.google.common.primitives.Doubles;
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
@ -51,7 +52,6 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
private final String fnCombine;
|
private final String fnCombine;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private final JavaScriptAggregator.ScriptAggregator compiledScript;
|
private final JavaScriptAggregator.ScriptAggregator compiledScript;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
|
@ -63,6 +63,12 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
@JsonProperty("fnCombine") final String fnCombine
|
@JsonProperty("fnCombine") final String fnCombine
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
|
||||||
|
Preconditions.checkNotNull(fieldNames, "Must have a valid, non null fieldNames");
|
||||||
|
Preconditions.checkNotNull(fnAggregate, "Must have a valid, non null fnAggregate");
|
||||||
|
Preconditions.checkNotNull(fnReset, "Must have a valid, non null fnReset");
|
||||||
|
Preconditions.checkNotNull(fnCombine, "Must have a valid, non null fnCombine");
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldNames = fieldNames;
|
this.fieldNames = fieldNames;
|
||||||
|
|
||||||
|
@ -83,7 +89,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
new com.google.common.base.Function<String, ObjectColumnSelector>()
|
new com.google.common.base.Function<String, ObjectColumnSelector>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public ObjectColumnSelector apply(@Nullable String s) {
|
public ObjectColumnSelector apply(@Nullable String s)
|
||||||
|
{
|
||||||
return columnFactory.makeObjectColumnSelector(s);
|
return columnFactory.makeObjectColumnSelector(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,7 +108,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
new com.google.common.base.Function<String, ObjectColumnSelector>()
|
new com.google.common.base.Function<String, ObjectColumnSelector>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public ObjectColumnSelector apply(@Nullable String s) {
|
public ObjectColumnSelector apply(@Nullable String s)
|
||||||
|
{
|
||||||
return columnSelectorFactory.makeObjectColumnSelector(s);
|
return columnSelectorFactory.makeObjectColumnSelector(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,7 +156,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public List<String> getFieldNames() {
|
public List<String> getFieldNames()
|
||||||
|
{
|
||||||
return fieldNames;
|
return fieldNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +191,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
try {
|
try {
|
||||||
MessageDigest md = MessageDigest.getInstance("SHA-1");
|
MessageDigest md = MessageDigest.getInstance("SHA-1");
|
||||||
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8);
|
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8);
|
||||||
byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8));
|
byte[] sha1 = md.digest((fnAggregate + fnReset + fnCombine).getBytes(Charsets.UTF_8));
|
||||||
|
|
||||||
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
|
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
|
||||||
.put(CACHE_TYPE_ID)
|
.put(CACHE_TYPE_ID)
|
||||||
|
@ -225,7 +234,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
public static JavaScriptAggregator.ScriptAggregator compileScript(final String aggregate, final String reset, final String combine)
|
public static JavaScriptAggregator.ScriptAggregator compileScript(
|
||||||
|
final String aggregate,
|
||||||
|
final String reset,
|
||||||
|
final String combine
|
||||||
|
)
|
||||||
{
|
{
|
||||||
final ContextFactory contextFactory = ContextFactory.getGlobal();
|
final ContextFactory contextFactory = ContextFactory.getGlobal();
|
||||||
Context context = contextFactory.enterContext();
|
Context context = contextFactory.enterContext();
|
||||||
|
@ -234,8 +247,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
final ScriptableObject scope = context.initStandardObjects();
|
final ScriptableObject scope = context.initStandardObjects();
|
||||||
|
|
||||||
final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null);
|
final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null);
|
||||||
final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null);
|
final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null);
|
||||||
final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null);
|
final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null);
|
||||||
Context.exit();
|
Context.exit();
|
||||||
|
|
||||||
return new JavaScriptAggregator.ScriptAggregator()
|
return new JavaScriptAggregator.ScriptAggregator()
|
||||||
|
@ -244,7 +257,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
public double aggregate(final double current, final ObjectColumnSelector[] selectorList)
|
public double aggregate(final double current, final ObjectColumnSelector[] selectorList)
|
||||||
{
|
{
|
||||||
Context cx = Context.getCurrentContext();
|
Context cx = Context.getCurrentContext();
|
||||||
if(cx == null) cx = contextFactory.enterContext();
|
if (cx == null) {
|
||||||
|
cx = contextFactory.enterContext();
|
||||||
|
}
|
||||||
|
|
||||||
final int size = selectorList.length;
|
final int size = selectorList.length;
|
||||||
final Object[] args = new Object[size + 1];
|
final Object[] args = new Object[size + 1];
|
||||||
|
@ -292,8 +307,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close()
|
||||||
if(Context.getCurrentContext() != null) {
|
{
|
||||||
|
if (Context.getCurrentContext() != null) {
|
||||||
Context.exit();
|
Context.exit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
|
||||||
|
@ -44,6 +45,9 @@ public class LongSumAggregatorFactory implements AggregatorFactory
|
||||||
@JsonProperty("fieldName") final String fieldName
|
@JsonProperty("fieldName") final String fieldName
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name");
|
||||||
|
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldName = fieldName;
|
this.fieldName = fieldName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.primitives.Doubles;
|
import com.google.common.primitives.Doubles;
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
|
||||||
|
@ -44,6 +45,9 @@ public class MaxAggregatorFactory implements AggregatorFactory
|
||||||
@JsonProperty("fieldName") final String fieldName
|
@JsonProperty("fieldName") final String fieldName
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
|
||||||
|
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldName = fieldName;
|
this.fieldName = fieldName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.primitives.Doubles;
|
import com.google.common.primitives.Doubles;
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
|
||||||
|
@ -44,6 +45,9 @@ public class MinAggregatorFactory implements AggregatorFactory
|
||||||
@JsonProperty("fieldName") final String fieldName
|
@JsonProperty("fieldName") final String fieldName
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
|
||||||
|
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldName = fieldName;
|
this.fieldName = fieldName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,9 @@ import com.metamx.druid.query.QueryRunner;
|
||||||
import com.metamx.druid.query.QueryRunnerFactory;
|
import com.metamx.druid.query.QueryRunnerFactory;
|
||||||
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
|
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import com.metamx.druid.query.QueryToolChest;
|
import com.metamx.druid.query.QueryToolChest;
|
||||||
|
import com.metamx.druid.query.segment.SegmentDescriptor;
|
||||||
|
import com.metamx.druid.query.segment.SpecificSegmentQueryRunner;
|
||||||
|
import com.metamx.druid.query.segment.SpecificSegmentSpec;
|
||||||
import com.metamx.druid.realtime.FireDepartmentMetrics;
|
import com.metamx.druid.realtime.FireDepartmentMetrics;
|
||||||
import com.metamx.druid.realtime.FireHydrant;
|
import com.metamx.druid.realtime.FireHydrant;
|
||||||
import com.metamx.druid.realtime.Schema;
|
import com.metamx.druid.realtime.Schema;
|
||||||
|
@ -253,23 +256,32 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
new Function<Sink, QueryRunner<T>>()
|
new Function<Sink, QueryRunner<T>>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public QueryRunner<T> apply(@Nullable Sink input)
|
public QueryRunner<T> apply(Sink input)
|
||||||
{
|
{
|
||||||
return new MetricsEmittingQueryRunner<T>(
|
return new SpecificSegmentQueryRunner<T>(
|
||||||
emitter,
|
new MetricsEmittingQueryRunner<T>(
|
||||||
builderFn,
|
emitter,
|
||||||
factory.mergeRunners(
|
builderFn,
|
||||||
EXEC,
|
factory.mergeRunners(
|
||||||
Iterables.transform(
|
EXEC,
|
||||||
input,
|
Iterables.transform(
|
||||||
new Function<FireHydrant, QueryRunner<T>>()
|
input,
|
||||||
{
|
new Function<FireHydrant, QueryRunner<T>>()
|
||||||
@Override
|
{
|
||||||
public QueryRunner<T> apply(@Nullable FireHydrant input)
|
@Override
|
||||||
{
|
public QueryRunner<T> apply(FireHydrant input)
|
||||||
return factory.createRunner(input.getSegment());
|
{
|
||||||
}
|
return factory.createRunner(input.getSegment());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new SpecificSegmentSpec(
|
||||||
|
new SegmentDescriptor(
|
||||||
|
input.getInterval(),
|
||||||
|
input.getSegment().getVersion(),
|
||||||
|
input.getSegment().getShardSpec().getPartitionNum()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
@ -380,12 +392,16 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
|
|
||||||
//final File[] sinkFiles = sinkDir.listFiles();
|
//final File[] sinkFiles = sinkDir.listFiles();
|
||||||
// To avoid reading and listing of "merged" dir
|
// To avoid reading and listing of "merged" dir
|
||||||
final File[] sinkFiles = sinkDir.listFiles(new FilenameFilter() {
|
final File[] sinkFiles = sinkDir.listFiles(
|
||||||
@Override
|
new FilenameFilter()
|
||||||
public boolean accept(File dir, String fileName) {
|
{
|
||||||
return !(Ints.tryParse(fileName) == null);
|
@Override
|
||||||
}
|
public boolean accept(File dir, String fileName)
|
||||||
});
|
{
|
||||||
|
return !(Ints.tryParse(fileName) == null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
Arrays.sort(
|
Arrays.sort(
|
||||||
sinkFiles,
|
sinkFiles,
|
||||||
new Comparator<File>()
|
new Comparator<File>()
|
||||||
|
@ -412,8 +428,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
// Although this has been tackled at start of this method.
|
// Although this has been tackled at start of this method.
|
||||||
// Just a doubly-check added to skip "merged" dir. from being added to hydrants
|
// Just a doubly-check added to skip "merged" dir. from being added to hydrants
|
||||||
// If 100% sure that this is not needed, this check can be removed.
|
// If 100% sure that this is not needed, this check can be removed.
|
||||||
if(Ints.tryParse(segmentDir.getName()) == null)
|
if (Ints.tryParse(segmentDir.getName()) == null) {
|
||||||
{
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -112,15 +112,16 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int iter = 0;
|
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
|
||||||
while (iter < maxSegmentsToMove) {
|
|
||||||
iter++;
|
|
||||||
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
|
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
|
||||||
final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
|
|
||||||
if (holder == null) {
|
if (params.getAvailableSegments().contains(segmentToMove.getSegment())) {
|
||||||
continue;
|
final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
|
||||||
|
|
||||||
|
if (holder != null) {
|
||||||
|
moveSegment(segmentToMove, holder.getServer(), params);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
moveSegment(segmentToMove, holder.getServer(), params);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
|
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
|
||||||
|
|
|
@ -62,16 +62,18 @@ public abstract class LoadRule implements Rule
|
||||||
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
||||||
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
|
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
|
||||||
|
|
||||||
stats.accumulate(
|
if (params.getAvailableSegments().contains(segment)) {
|
||||||
assign(
|
stats.accumulate(
|
||||||
params.getReplicationManager(),
|
assign(
|
||||||
expectedReplicants,
|
params.getReplicationManager(),
|
||||||
totalReplicants,
|
expectedReplicants,
|
||||||
analyzer,
|
totalReplicants,
|
||||||
serverHolderList,
|
analyzer,
|
||||||
segment
|
serverHolderList,
|
||||||
)
|
segment
|
||||||
);
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
|
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,9 @@ import org.joda.time.DateTime;
|
||||||
@JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class),
|
@JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class),
|
||||||
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
|
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
|
||||||
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
|
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
|
||||||
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class)
|
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
|
||||||
|
@JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class),
|
||||||
|
@JsonSubTypes.Type(name = "dropBySize", value = SizeDropRule.class)
|
||||||
})
|
})
|
||||||
|
|
||||||
public interface Rule
|
public interface Rule
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.master.rules;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.Range;
|
||||||
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class SizeDropRule extends DropRule
|
||||||
|
{
|
||||||
|
private final long low;
|
||||||
|
private final long high;
|
||||||
|
private final Range<Long> range;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public SizeDropRule(
|
||||||
|
@JsonProperty("low") long low,
|
||||||
|
@JsonProperty("high") long high
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.low = low;
|
||||||
|
this.high = high;
|
||||||
|
this.range = Range.closedOpen(low, high);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return "dropBySize";
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getLow()
|
||||||
|
{
|
||||||
|
return low;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getHigh()
|
||||||
|
{
|
||||||
|
return high;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return range.contains(segment.getSize());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,96 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.master.rules;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.Range;
|
||||||
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class SizeLoadRule extends LoadRule
|
||||||
|
{
|
||||||
|
private final long low;
|
||||||
|
private final long high;
|
||||||
|
private final Integer replicants;
|
||||||
|
private final String tier;
|
||||||
|
private final Range<Long> range;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public SizeLoadRule(
|
||||||
|
@JsonProperty("low") long low,
|
||||||
|
@JsonProperty("high") long high,
|
||||||
|
@JsonProperty("replicants") Integer replicants,
|
||||||
|
@JsonProperty("tier") String tier
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.low = low;
|
||||||
|
this.high = high;
|
||||||
|
this.replicants = replicants;
|
||||||
|
this.tier = tier;
|
||||||
|
this.range = Range.closedOpen(low, high);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public int getReplicants()
|
||||||
|
{
|
||||||
|
return replicants;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReplicants(String tier)
|
||||||
|
{
|
||||||
|
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public String getTier()
|
||||||
|
{
|
||||||
|
return tier;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return "loadBySize";
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getLow()
|
||||||
|
{
|
||||||
|
return low;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getHigh()
|
||||||
|
{
|
||||||
|
return high;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return range.contains(segment.getSize());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue