Merge branch 'master' into worker-resource

Conflicts:
	client/src/main/java/com/metamx/druid/QueryableNode.java
This commit is contained in:
fjy 2013-06-27 12:52:58 -07:00
commit d5306c5dd9
17 changed files with 348 additions and 85 deletions

View File

@ -46,6 +46,7 @@ import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.curator.SegmentReader; import com.metamx.druid.curator.SegmentReader;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
@ -371,21 +372,20 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
if (requestLogger == null) { if (requestLogger == null) {
try { try {
final String loggingType = props.getProperty("druid.request.logging.type"); final String loggingType = props.getProperty("druid.request.logging.type");
if ("emitter".equals(loggingType)) { if("emitter".equals(loggingType)) {
setRequestLogger( setRequestLogger(Initialization.makeEmittingRequestLogger(
Initialization.makeEmittingRequestLogger( getProps(),
getProps(), getEmitter()
getEmitter() ));
) }
); else if ("file".equalsIgnoreCase(loggingType)) {
setRequestLogger(Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
));
} else { } else {
setRequestLogger( setRequestLogger(new NoopRequestLogger());
Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
)
);
} }
} }
catch (IOException e) { catch (IOException e) {

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
/**
*/
public class NoopRequestLogger implements RequestLogger
{
@Override
public void log(RequestLogLine requestLogLine) throws Exception
{
// do nothing
}
}

View File

@ -46,13 +46,16 @@ import org.joda.time.Minutes;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery> public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
{ {
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){}; private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>()
{
};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4}; private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
@Override @Override
@ -228,6 +231,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{ {
return left.getId().compareTo(right.getId()); return left.getId().compareTo(right.getId());
} }
}; }.nullsFirst();
} }
} }

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
@ -40,6 +41,8 @@ public class CountAggregatorFactory implements AggregatorFactory
@JsonProperty("name") String name @JsonProperty("name") String name
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
this.name = name; this.name = name;
} }

View File

@ -19,6 +19,7 @@
package com.metamx.druid.aggregation; package com.metamx.druid.aggregation;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.FloatMetricSelector;
@ -28,14 +29,14 @@ import java.util.Comparator;
*/ */
public class DoubleSumAggregator implements Aggregator public class DoubleSumAggregator implements Aggregator
{ {
static final Comparator COMPARATOR = new Comparator() static final Comparator COMPARATOR = new Ordering()
{ {
@Override @Override
public int compare(Object o, Object o1) public int compare(Object o, Object o1)
{ {
return Doubles.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue()); return Doubles.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue());
} }
}; }.nullsFirst();
static double combineValues(Object lhs, Object rhs) static double combineValues(Object lhs, Object rhs)
{ {

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
@ -44,6 +45,9 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;
} }

View File

@ -22,6 +22,8 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.primitives.Floats; import com.google.common.primitives.Floats;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
@ -49,12 +51,18 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@JsonProperty("breaks") final List<Float> breaksList @JsonProperty("breaks") final List<Float> breaksList
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;
this.breaksList = breaksList; this.breaksList = (breaksList == null) ? Lists.<Float>newArrayList() :breaksList;
this.breaks = new float[breaksList.size()]; this.breaks = new float[this.breaksList.size()];
for(int i = 0; i < breaksList.size(); ++i) this.breaks[i] = breaksList.get(i); for (int i = 0; i < this.breaksList.size(); ++i) {
this.breaks[i] = this.breaksList.get(i);
}
} }
@Override @Override
public Aggregator factorize(ColumnSelectorFactory metricFactory) public Aggregator factorize(ColumnSelectorFactory metricFactory)
{ {
@ -95,14 +103,12 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {
if (object instanceof byte []) { if (object instanceof byte[]) {
return Histogram.fromBytes((byte []) object); return Histogram.fromBytes((byte[]) object);
} } else if (object instanceof ByteBuffer) {
else if (object instanceof ByteBuffer) {
return Histogram.fromBytes((ByteBuffer) object); return Histogram.fromBytes((ByteBuffer) object);
} } else if (object instanceof String) {
else if(object instanceof String) { byte[] bytes = Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8));
byte[] bytes = Base64.decodeBase64(((String)object).getBytes(Charsets.UTF_8));
return Histogram.fromBytes(bytes); return Histogram.fromBytes(bytes);
} }
return object; return object;
@ -111,7 +117,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@Override @Override
public Object finalizeComputation(Object object) public Object finalizeComputation(Object object)
{ {
return ((Histogram)object).asVisual(); return ((Histogram) object).asVisual();
} }
@Override @Override
@ -149,7 +155,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@Override @Override
public String getTypeName() public String getTypeName()
{ {
throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()"); throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()");
} }
@Override @Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
@ -51,7 +52,6 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
private final String fnCombine; private final String fnCombine;
private final JavaScriptAggregator.ScriptAggregator compiledScript; private final JavaScriptAggregator.ScriptAggregator compiledScript;
@JsonCreator @JsonCreator
@ -63,6 +63,12 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@JsonProperty("fnCombine") final String fnCombine @JsonProperty("fnCombine") final String fnCombine
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
Preconditions.checkNotNull(fieldNames, "Must have a valid, non null fieldNames");
Preconditions.checkNotNull(fnAggregate, "Must have a valid, non null fnAggregate");
Preconditions.checkNotNull(fnReset, "Must have a valid, non null fnReset");
Preconditions.checkNotNull(fnCombine, "Must have a valid, non null fnCombine");
this.name = name; this.name = name;
this.fieldNames = fieldNames; this.fieldNames = fieldNames;
@ -83,7 +89,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
new com.google.common.base.Function<String, ObjectColumnSelector>() new com.google.common.base.Function<String, ObjectColumnSelector>()
{ {
@Override @Override
public ObjectColumnSelector apply(@Nullable String s) { public ObjectColumnSelector apply(@Nullable String s)
{
return columnFactory.makeObjectColumnSelector(s); return columnFactory.makeObjectColumnSelector(s);
} }
} }
@ -101,7 +108,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
new com.google.common.base.Function<String, ObjectColumnSelector>() new com.google.common.base.Function<String, ObjectColumnSelector>()
{ {
@Override @Override
public ObjectColumnSelector apply(@Nullable String s) { public ObjectColumnSelector apply(@Nullable String s)
{
return columnSelectorFactory.makeObjectColumnSelector(s); return columnSelectorFactory.makeObjectColumnSelector(s);
} }
} }
@ -148,7 +156,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
} }
@JsonProperty @JsonProperty
public List<String> getFieldNames() { public List<String> getFieldNames()
{
return fieldNames; return fieldNames;
} }
@ -182,7 +191,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
try { try {
MessageDigest md = MessageDigest.getInstance("SHA-1"); MessageDigest md = MessageDigest.getInstance("SHA-1");
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8); byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8);
byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8)); byte[] sha1 = md.digest((fnAggregate + fnReset + fnCombine).getBytes(Charsets.UTF_8));
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length) return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
.put(CACHE_TYPE_ID) .put(CACHE_TYPE_ID)
@ -225,7 +234,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
'}'; '}';
} }
public static JavaScriptAggregator.ScriptAggregator compileScript(final String aggregate, final String reset, final String combine) public static JavaScriptAggregator.ScriptAggregator compileScript(
final String aggregate,
final String reset,
final String combine
)
{ {
final ContextFactory contextFactory = ContextFactory.getGlobal(); final ContextFactory contextFactory = ContextFactory.getGlobal();
Context context = contextFactory.enterContext(); Context context = contextFactory.enterContext();
@ -234,8 +247,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
final ScriptableObject scope = context.initStandardObjects(); final ScriptableObject scope = context.initStandardObjects();
final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null); final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null);
final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null); final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null);
final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null); final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null);
Context.exit(); Context.exit();
return new JavaScriptAggregator.ScriptAggregator() return new JavaScriptAggregator.ScriptAggregator()
@ -244,7 +257,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
public double aggregate(final double current, final ObjectColumnSelector[] selectorList) public double aggregate(final double current, final ObjectColumnSelector[] selectorList)
{ {
Context cx = Context.getCurrentContext(); Context cx = Context.getCurrentContext();
if(cx == null) cx = contextFactory.enterContext(); if (cx == null) {
cx = contextFactory.enterContext();
}
final int size = selectorList.length; final int size = selectorList.length;
final Object[] args = new Object[size + 1]; final Object[] args = new Object[size + 1];
@ -292,8 +307,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
} }
@Override @Override
public void close() { public void close()
if(Context.getCurrentContext() != null) { {
if (Context.getCurrentContext() != null) {
Context.exit(); Context.exit();
} }
} }

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
@ -44,6 +45,9 @@ public class LongSumAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;
} }

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
@ -44,6 +45,9 @@ public class MaxAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;
} }

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
@ -44,6 +45,9 @@ public class MinAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;
} }

View File

@ -55,6 +55,9 @@ import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.query.segment.SpecificSegmentQueryRunner;
import com.metamx.druid.query.segment.SpecificSegmentSpec;
import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.FireHydrant; import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
@ -253,23 +256,32 @@ public class RealtimePlumberSchool implements PlumberSchool
new Function<Sink, QueryRunner<T>>() new Function<Sink, QueryRunner<T>>()
{ {
@Override @Override
public QueryRunner<T> apply(@Nullable Sink input) public QueryRunner<T> apply(Sink input)
{ {
return new MetricsEmittingQueryRunner<T>( return new SpecificSegmentQueryRunner<T>(
emitter, new MetricsEmittingQueryRunner<T>(
builderFn, emitter,
factory.mergeRunners( builderFn,
EXEC, factory.mergeRunners(
Iterables.transform( EXEC,
input, Iterables.transform(
new Function<FireHydrant, QueryRunner<T>>() input,
{ new Function<FireHydrant, QueryRunner<T>>()
@Override {
public QueryRunner<T> apply(@Nullable FireHydrant input) @Override
{ public QueryRunner<T> apply(FireHydrant input)
return factory.createRunner(input.getSegment()); {
} return factory.createRunner(input.getSegment());
} }
}
)
)
),
new SpecificSegmentSpec(
new SegmentDescriptor(
input.getInterval(),
input.getSegment().getVersion(),
input.getSegment().getShardSpec().getPartitionNum()
) )
) )
); );
@ -380,12 +392,16 @@ public class RealtimePlumberSchool implements PlumberSchool
//final File[] sinkFiles = sinkDir.listFiles(); //final File[] sinkFiles = sinkDir.listFiles();
// To avoid reading and listing of "merged" dir // To avoid reading and listing of "merged" dir
final File[] sinkFiles = sinkDir.listFiles(new FilenameFilter() { final File[] sinkFiles = sinkDir.listFiles(
@Override new FilenameFilter()
public boolean accept(File dir, String fileName) { {
return !(Ints.tryParse(fileName) == null); @Override
} public boolean accept(File dir, String fileName)
}); {
return !(Ints.tryParse(fileName) == null);
}
}
);
Arrays.sort( Arrays.sort(
sinkFiles, sinkFiles,
new Comparator<File>() new Comparator<File>()
@ -412,8 +428,7 @@ public class RealtimePlumberSchool implements PlumberSchool
// Although this has been tackled at start of this method. // Although this has been tackled at start of this method.
// Just a doubly-check added to skip "merged" dir. from being added to hydrants // Just a doubly-check added to skip "merged" dir. from being added to hydrants
// If 100% sure that this is not needed, this check can be removed. // If 100% sure that this is not needed, this check can be removed.
if(Ints.tryParse(segmentDir.getName()) == null) if (Ints.tryParse(segmentDir.getName()) == null) {
{
continue; continue;
} }

View File

@ -112,15 +112,16 @@ public class DruidMasterBalancer implements DruidMasterHelper
continue; continue;
} }
int iter = 0; for (int iter = 0; iter < maxSegmentsToMove; iter++) {
while (iter < maxSegmentsToMove) {
iter++;
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList); final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
if (holder == null) { if (params.getAvailableSegments().contains(segmentToMove.getSegment())) {
continue; final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
}
} }
moveSegment(segmentToMove, holder.getServer(), params);
} }
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);

View File

@ -62,16 +62,18 @@ public abstract class LoadRule implements Rule
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
stats.accumulate( if (params.getAvailableSegments().contains(segment)) {
assign( stats.accumulate(
params.getReplicationManager(), assign(
expectedReplicants, params.getReplicationManager(),
totalReplicants, expectedReplicants,
analyzer, totalReplicants,
serverHolderList, analyzer,
segment serverHolderList,
) segment
); )
);
}
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));

View File

@ -35,7 +35,9 @@ import org.joda.time.DateTime;
@JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class), @JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class),
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class), @JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class), @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class) @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
@JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class),
@JsonSubTypes.Type(name = "dropBySize", value = SizeDropRule.class)
}) })
public interface Rule public interface Rule

View File

@ -0,0 +1,71 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Range;
import com.metamx.druid.client.DataSegment;
import org.joda.time.DateTime;
/**
*/
public class SizeDropRule extends DropRule
{
private final long low;
private final long high;
private final Range<Long> range;
@JsonCreator
public SizeDropRule(
@JsonProperty("low") long low,
@JsonProperty("high") long high
)
{
this.low = low;
this.high = high;
this.range = Range.closedOpen(low, high);
}
@Override
@JsonProperty
public String getType()
{
return "dropBySize";
}
@JsonProperty
public long getLow()
{
return low;
}
@JsonProperty
public long getHigh()
{
return high;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return range.contains(segment.getSize());
}
}

View File

@ -0,0 +1,96 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Range;
import com.metamx.druid.client.DataSegment;
import org.joda.time.DateTime;
/**
*/
public class SizeLoadRule extends LoadRule
{
private final long low;
private final long high;
private final Integer replicants;
private final String tier;
private final Range<Long> range;
@JsonCreator
public SizeLoadRule(
@JsonProperty("low") long low,
@JsonProperty("high") long high,
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.low = low;
this.high = high;
this.replicants = replicants;
this.tier = tier;
this.range = Range.closedOpen(low, high);
}
@Override
@JsonProperty
public int getReplicants()
{
return replicants;
}
@Override
public int getReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
@JsonProperty
public String getTier()
{
return tier;
}
@Override
public String getType()
{
return "loadBySize";
}
@JsonProperty
public long getLow()
{
return low;
}
@JsonProperty
public long getHigh()
{
return high;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return range.contains(segment.getSize());
}
}