From 15a4dae2b17c605c7812eed7bbd464bbd8e6f319 Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 26 Jun 2013 10:45:54 -0700 Subject: [PATCH] bunch of issue fixes --- .../java/com/metamx/druid/QueryableNode.java | 5 +- .../metamx/druid/http/NoopRequestLogger.java | 31 ++++++ .../SegmentMetadataQueryQueryToolChest.java | 7 +- .../aggregation/CountAggregatorFactory.java | 3 + .../aggregation/DoubleSumAggregator.java | 5 +- .../DoubleSumAggregatorFactory.java | 4 + .../HistogramAggregatorFactory.java | 30 +++--- .../JavaScriptAggregatorFactory.java | 38 +++++--- .../aggregation/LongSumAggregatorFactory.java | 4 + .../aggregation/MaxAggregatorFactory.java | 4 + .../aggregation/MinAggregatorFactory.java | 4 + .../plumber/RealtimePlumberSchool.java | 67 ++++++++----- .../druid/master/DruidMasterBalancer.java | 15 +-- .../metamx/druid/master/rules/LoadRule.java | 22 +++-- .../com/metamx/druid/master/rules/Rule.java | 4 +- .../druid/master/rules/SizeDropRule.java | 71 ++++++++++++++ .../druid/master/rules/SizeLoadRule.java | 96 +++++++++++++++++++ 17 files changed, 338 insertions(+), 72 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/http/NoopRequestLogger.java create mode 100644 server/src/main/java/com/metamx/druid/master/rules/SizeDropRule.java create mode 100644 server/src/main/java/com/metamx/druid/master/rules/SizeLoadRule.java diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 68f978929d6..d01032817f7 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -43,6 +43,7 @@ import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.RequestLogger; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; @@ -374,12 +375,14 @@ public abstract class QueryableNode extends Registering getEmitter() )); } - else { + else if ("file".equalsIgnoreCase(loggingType)) { setRequestLogger(Initialization.makeFileRequestLogger( getJsonMapper(), getScheduledExecutorFactory(), getProps() )); + } else { + setRequestLogger(new NoopRequestLogger()); } } catch (IOException e) { diff --git a/client/src/main/java/com/metamx/druid/http/NoopRequestLogger.java b/client/src/main/java/com/metamx/druid/http/NoopRequestLogger.java new file mode 100644 index 00000000000..ddc422c91be --- /dev/null +++ b/client/src/main/java/com/metamx/druid/http/NoopRequestLogger.java @@ -0,0 +1,31 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.http; + +/** + */ +public class NoopRequestLogger implements RequestLogger +{ + @Override + public void log(RequestLogLine requestLogLine) throws Exception + { + // do nothing + } +} diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 2210a0f5a7b..1cfceac4db8 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -46,13 +46,16 @@ import org.joda.time.Minutes; import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; public class SegmentMetadataQueryQueryToolChest extends QueryToolChest { - private static final TypeReference TYPE_REFERENCE = new TypeReference(){}; + private static final TypeReference TYPE_REFERENCE = new TypeReference() + { + }; private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4}; @Override @@ -228,6 +231,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest breaksList ) { + Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); + this.name = name; this.fieldName = fieldName; - this.breaksList = breaksList; - this.breaks = new float[breaksList.size()]; - for(int i = 0; i < breaksList.size(); ++i) this.breaks[i] = breaksList.get(i); + this.breaksList = (breaksList == null) ? Lists.newArrayList() :breaksList; + this.breaks = new float[this.breaksList.size()]; + for (int i = 0; i < this.breaksList.size(); ++i) { + this.breaks[i] = this.breaksList.get(i); + } } + @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { @@ -95,14 +103,12 @@ public class HistogramAggregatorFactory implements AggregatorFactory @Override public Object deserialize(Object object) { - if (object instanceof byte []) { - return Histogram.fromBytes((byte []) object); - } - else if (object instanceof ByteBuffer) { + if (object instanceof byte[]) { + return Histogram.fromBytes((byte[]) object); + } else if (object instanceof ByteBuffer) { return Histogram.fromBytes((ByteBuffer) object); - } - else if(object instanceof String) { - byte[] bytes = Base64.decodeBase64(((String)object).getBytes(Charsets.UTF_8)); + } else if (object instanceof String) { + byte[] bytes = Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8)); return Histogram.fromBytes(bytes); } return object; @@ -111,7 +117,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory @Override public Object finalizeComputation(Object object) { - return ((Histogram)object).asVisual(); + return ((Histogram) object).asVisual(); } @Override @@ -149,7 +155,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory @Override public String getTypeName() { - throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()"); + throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()"); } @Override diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index af68b26df42..7e4c1a66c6a 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -51,7 +52,6 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory private final String fnCombine; - private final JavaScriptAggregator.ScriptAggregator compiledScript; @JsonCreator @@ -63,6 +63,12 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @JsonProperty("fnCombine") final String fnCombine ) { + Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name"); + Preconditions.checkNotNull(fieldNames, "Must have a valid, non null fieldNames"); + Preconditions.checkNotNull(fnAggregate, "Must have a valid, non null fnAggregate"); + Preconditions.checkNotNull(fnReset, "Must have a valid, non null fnReset"); + Preconditions.checkNotNull(fnCombine, "Must have a valid, non null fnCombine"); + this.name = name; this.fieldNames = fieldNames; @@ -83,7 +89,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory new com.google.common.base.Function() { @Override - public ObjectColumnSelector apply(@Nullable String s) { + public ObjectColumnSelector apply(@Nullable String s) + { return columnFactory.makeObjectColumnSelector(s); } } @@ -101,7 +108,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory new com.google.common.base.Function() { @Override - public ObjectColumnSelector apply(@Nullable String s) { + public ObjectColumnSelector apply(@Nullable String s) + { return columnSelectorFactory.makeObjectColumnSelector(s); } } @@ -148,7 +156,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @JsonProperty - public List getFieldNames() { + public List getFieldNames() + { return fieldNames; } @@ -182,7 +191,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory try { MessageDigest md = MessageDigest.getInstance("SHA-1"); byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8); - byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8)); + byte[] sha1 = md.digest((fnAggregate + fnReset + fnCombine).getBytes(Charsets.UTF_8)); return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length) .put(CACHE_TYPE_ID) @@ -225,7 +234,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory '}'; } - public static JavaScriptAggregator.ScriptAggregator compileScript(final String aggregate, final String reset, final String combine) + public static JavaScriptAggregator.ScriptAggregator compileScript( + final String aggregate, + final String reset, + final String combine + ) { final ContextFactory contextFactory = ContextFactory.getGlobal(); Context context = contextFactory.enterContext(); @@ -234,8 +247,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory final ScriptableObject scope = context.initStandardObjects(); final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null); - final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null); - final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null); + final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null); + final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null); Context.exit(); return new JavaScriptAggregator.ScriptAggregator() @@ -244,7 +257,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public double aggregate(final double current, final ObjectColumnSelector[] selectorList) { Context cx = Context.getCurrentContext(); - if(cx == null) cx = contextFactory.enterContext(); + if (cx == null) { + cx = contextFactory.enterContext(); + } final int size = selectorList.length; final Object[] args = new Object[size + 1]; @@ -292,8 +307,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @Override - public void close() { - if(Context.getCurrentContext() != null) { + public void close() + { + if (Context.getCurrentContext() != null) { Context.exit(); } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java index d84e22041db..1aeb5e6471d 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java @@ -21,6 +21,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -44,6 +45,9 @@ public class LongSumAggregatorFactory implements AggregatorFactory @JsonProperty("fieldName") final String fieldName ) { + Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); + this.name = name; this.fieldName = fieldName; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java index fd66aeae90d..dda25b355d7 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java @@ -21,6 +21,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -44,6 +45,9 @@ public class MaxAggregatorFactory implements AggregatorFactory @JsonProperty("fieldName") final String fieldName ) { + Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); + this.name = name; this.fieldName = fieldName; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java index ed1e82cc9b5..5b92072be07 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java @@ -21,6 +21,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -44,6 +45,9 @@ public class MinAggregatorFactory implements AggregatorFactory @JsonProperty("fieldName") final String fieldName ) { + Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); + this.name = name; this.fieldName = fieldName; } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index cee1bfb00ff..5ba229ea56a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -55,6 +55,9 @@ import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; +import com.metamx.druid.query.segment.SegmentDescriptor; +import com.metamx.druid.query.segment.SpecificSegmentQueryRunner; +import com.metamx.druid.query.segment.SpecificSegmentSpec; import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireHydrant; import com.metamx.druid.realtime.Schema; @@ -253,23 +256,32 @@ public class RealtimePlumberSchool implements PlumberSchool new Function>() { @Override - public QueryRunner apply(@Nullable Sink input) + public QueryRunner apply(Sink input) { - return new MetricsEmittingQueryRunner( - emitter, - builderFn, - factory.mergeRunners( - EXEC, - Iterables.transform( - input, - new Function>() - { - @Override - public QueryRunner apply(@Nullable FireHydrant input) - { - return factory.createRunner(input.getSegment()); - } - } + return new SpecificSegmentQueryRunner( + new MetricsEmittingQueryRunner( + emitter, + builderFn, + factory.mergeRunners( + EXEC, + Iterables.transform( + input, + new Function>() + { + @Override + public QueryRunner apply(FireHydrant input) + { + return factory.createRunner(input.getSegment()); + } + } + ) + ) + ), + new SpecificSegmentSpec( + new SegmentDescriptor( + input.getInterval(), + input.getSegment().getVersion(), + input.getSegment().getShardSpec().getPartitionNum() ) ) ); @@ -380,12 +392,16 @@ public class RealtimePlumberSchool implements PlumberSchool //final File[] sinkFiles = sinkDir.listFiles(); // To avoid reading and listing of "merged" dir - final File[] sinkFiles = sinkDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String fileName) { - return !(Ints.tryParse(fileName) == null); - } - }); + final File[] sinkFiles = sinkDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File dir, String fileName) + { + return !(Ints.tryParse(fileName) == null); + } + } + ); Arrays.sort( sinkFiles, new Comparator() @@ -408,15 +424,14 @@ public class RealtimePlumberSchool implements PlumberSchool List hydrants = Lists.newArrayList(); for (File segmentDir : sinkFiles) { log.info("Loading previously persisted segment at [%s]", segmentDir); - + // Although this has been tackled at start of this method. // Just a doubly-check added to skip "merged" dir. from being added to hydrants // If 100% sure that this is not needed, this check can be removed. - if(Ints.tryParse(segmentDir.getName()) == null) - { + if (Ints.tryParse(segmentDir.getName()) == null) { continue; } - + hydrants.add( new FireHydrant( new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)), diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 9f031146cce..05e5992466d 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -112,15 +112,16 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - int iter = 0; - while (iter < maxSegmentsToMove) { - iter++; + for (int iter = 0; iter < maxSegmentsToMove; iter++) { final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList); - final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); - if (holder == null) { - continue; + + if (params.getAvailableSegments().contains(segmentToMove.getSegment())) { + final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); + + if (holder != null) { + moveSegment(segmentToMove, holder.getServer(), params); + } } - moveSegment(segmentToMove, holder.getServer(), params); } final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index affc0b93bdd..9677b5028bc 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -62,16 +62,18 @@ public abstract class LoadRule implements Rule final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); - stats.accumulate( - assign( - params.getReplicationManager(), - expectedReplicants, - totalReplicants, - analyzer, - serverHolderList, - segment - ) - ); + if (params.getAvailableSegments().contains(segment)) { + stats.accumulate( + assign( + params.getReplicationManager(), + expectedReplicants, + totalReplicants, + analyzer, + serverHolderList, + segment + ) + ); + } stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); diff --git a/server/src/main/java/com/metamx/druid/master/rules/Rule.java b/server/src/main/java/com/metamx/druid/master/rules/Rule.java index efc6a91c5ab..1c77a0ebc8f 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/Rule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/Rule.java @@ -35,7 +35,9 @@ import org.joda.time.DateTime; @JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class), @JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class), @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class), - @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class) + @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class), + @JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class), + @JsonSubTypes.Type(name = "dropBySize", value = SizeDropRule.class) }) public interface Rule diff --git a/server/src/main/java/com/metamx/druid/master/rules/SizeDropRule.java b/server/src/main/java/com/metamx/druid/master/rules/SizeDropRule.java new file mode 100644 index 00000000000..0bd9f94cd00 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/rules/SizeDropRule.java @@ -0,0 +1,71 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.master.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Range; +import com.metamx.druid.client.DataSegment; +import org.joda.time.DateTime; + +/** + */ +public class SizeDropRule extends DropRule +{ + private final long low; + private final long high; + private final Range range; + + @JsonCreator + public SizeDropRule( + @JsonProperty("low") long low, + @JsonProperty("high") long high + ) + { + this.low = low; + this.high = high; + this.range = Range.closedOpen(low, high); + } + + @Override + @JsonProperty + public String getType() + { + return "dropBySize"; + } + + @JsonProperty + public long getLow() + { + return low; + } + + @JsonProperty + public long getHigh() + { + return high; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return range.contains(segment.getSize()); + } +} diff --git a/server/src/main/java/com/metamx/druid/master/rules/SizeLoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/SizeLoadRule.java new file mode 100644 index 00000000000..421432ec6b9 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/rules/SizeLoadRule.java @@ -0,0 +1,96 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.master.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Range; +import com.metamx.druid.client.DataSegment; +import org.joda.time.DateTime; + +/** + */ +public class SizeLoadRule extends LoadRule +{ + private final long low; + private final long high; + private final Integer replicants; + private final String tier; + private final Range range; + + @JsonCreator + public SizeLoadRule( + @JsonProperty("low") long low, + @JsonProperty("high") long high, + @JsonProperty("replicants") Integer replicants, + @JsonProperty("tier") String tier + ) + { + this.low = low; + this.high = high; + this.replicants = replicants; + this.tier = tier; + this.range = Range.closedOpen(low, high); + } + + @Override + @JsonProperty + public int getReplicants() + { + return replicants; + } + + @Override + public int getReplicants(String tier) + { + return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0; + } + + @Override + @JsonProperty + public String getTier() + { + return tier; + } + + @Override + public String getType() + { + return "loadBySize"; + } + + @JsonProperty + public long getLow() + { + return low; + } + + @JsonProperty + public long getHigh() + { + return high; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return range.contains(segment.getSize()); + } +}