From ca7335f45fa6e8f1b95c770e63ef56939a5c6937 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 13 Dec 2013 13:35:22 -0800 Subject: [PATCH] things dont work --- .../Tutorial:-Loading-Your-Data-Part-1.md | 6 +- .../java/io/druid/client/DruidServer.java | 2 + .../io/druid/client/DruidServerConfig.java | 9 ++- .../java/io/druid/db/DatabaseRuleManager.java | 9 ++- .../server/coordinator/DruidCoordinator.java | 9 ++- .../server/coordinator/LoadPeonCallback.java | 4 +- .../DruidCoordinatorBalancer.java | 12 ++- .../{ => helper}/DruidCoordinatorCleanup.java | 12 ++- .../{ => helper}/DruidCoordinatorHelper.java | 4 +- .../{ => helper}/DruidCoordinatorLogger.java | 7 +- .../DruidCoordinatorRuleRunner.java | 7 +- .../DruidCoordinatorSegmentInfoLoader.java | 4 +- .../DruidCoordinatorSegmentMerger.java | 5 +- .../DruidCoordinatorVersionConverter.java | 66 +++++++++++++++++ .../coordinator/rules/ForeverDropRule.java | 40 ++++++++++ .../coordinator/rules/ForeverLoadRule.java | 73 +++++++++++++++++++ .../server/coordinator/rules/LoadRule.java | 4 +- .../druid/server/coordinator/rules/Rule.java | 1 + .../server/http/CoordinatorResource.java | 4 +- .../DruidCoordinatorBalancerProfiler.java | 1 + .../DruidCoordinatorBalancerTester.java | 3 +- .../DruidCoordinatorRuleRunnerTest.java | 1 + .../DruidCoordinatorSegmentMergerTest.java | 1 + 23 files changed, 259 insertions(+), 25 deletions(-) rename server/src/main/java/io/druid/server/coordinator/{ => helper}/DruidCoordinatorBalancer.java (92%) rename server/src/main/java/io/druid/server/coordinator/{ => helper}/DruidCoordinatorCleanup.java (89%) rename server/src/main/java/io/druid/server/coordinator/{ => helper}/DruidCoordinatorHelper.java (89%) rename server/src/main/java/io/druid/server/coordinator/{ => helper}/DruidCoordinatorLogger.java (96%) rename server/src/main/java/io/druid/server/coordinator/{ => helper}/DruidCoordinatorRuleRunner.java (92%) rename server/src/main/java/io/druid/server/coordinator/{ => helper}/DruidCoordinatorSegmentInfoLoader.java (91%) rename server/src/main/java/io/druid/server/coordinator/{ => helper}/DruidCoordinatorSegmentMerger.java (98%) create mode 100644 server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java create mode 100644 server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java create mode 100644 server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md index 2c56c81a839..8c77aba0d07 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md @@ -218,9 +218,9 @@ Congratulations! The segment has completed building. Once a segment is built, a You should see the following logs on the coordinator: ```bash -2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers -2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Load Queues: -2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served. +2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers +2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Load Queues: +2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served. ``` These logs indicate that the coordinator has assigned our new segment to the historical node to download and serve. If you look at the historical node logs, you should see: diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index 53acc501513..23876497548 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -36,7 +36,9 @@ import java.util.concurrent.ConcurrentMap; */ public class DruidServer implements Comparable { + public static final int DEFAULT_NUM_REPLICANTS = 2; public static final String DEFAULT_TIER = "_default_tier"; + private static final Logger log = new Logger(DruidServer.class); private final Object lock = new Object(); diff --git a/server/src/main/java/io/druid/client/DruidServerConfig.java b/server/src/main/java/io/druid/client/DruidServerConfig.java index 089c05ff021..a990d56ef15 100644 --- a/server/src/main/java/io/druid/client/DruidServerConfig.java +++ b/server/src/main/java/io/druid/client/DruidServerConfig.java @@ -32,7 +32,10 @@ public class DruidServerConfig private long maxSize = 0; @JsonProperty - private String tier = "_default_tier"; + private String tier = DruidServer.DEFAULT_TIER; + + @JsonProperty + private String zone = DruidServer.DEFAULT_ZONE; public long getMaxSize() { @@ -43,4 +46,8 @@ public class DruidServerConfig { return tier; } + + public String getZone() { + return zone; + } } diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManager.java b/server/src/main/java/io/druid/db/DatabaseRuleManager.java index 0ba44c52c85..9c5efde4fcd 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManager.java @@ -31,9 +31,11 @@ import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import io.druid.client.DruidServer; import io.druid.concurrent.Execs; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Json; +import io.druid.server.coordinator.rules.ForeverLoadRule; import io.druid.server.coordinator.rules.PeriodLoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; @@ -86,10 +88,9 @@ public class DatabaseRuleManager } final List defaultRules = Arrays.asList( - new PeriodLoadRule( - new Period("P5000Y"), - 2, - "_default_tier" + new ForeverLoadRule( + DruidServer.DEFAULT_NUM_REPLICANTS, + DruidServer.DEFAULT_TIER ) ); final String version = new DateTime().toString(); diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 0ae18185015..993843a0a27 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -50,6 +50,13 @@ import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseSegmentManager; import io.druid.guice.ManageLifecycle; import io.druid.segment.IndexIO; +import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; +import io.druid.server.coordinator.helper.DruidCoordinatorCleanup; +import io.druid.server.coordinator.helper.DruidCoordinatorHelper; +import io.druid.server.coordinator.helper.DruidCoordinatorLogger; +import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -310,7 +317,7 @@ public class DruidCoordinator new LoadPeonCallback() { @Override - protected void execute() + public void execute() { try { if (curator.checkExists().forPath(toServedSegPath) != null && diff --git a/server/src/main/java/io/druid/server/coordinator/LoadPeonCallback.java b/server/src/main/java/io/druid/server/coordinator/LoadPeonCallback.java index ff0cea085ca..eaf3b0267d7 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadPeonCallback.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadPeonCallback.java @@ -21,7 +21,7 @@ package io.druid.server.coordinator; /** */ -public abstract class LoadPeonCallback +public interface LoadPeonCallback { - protected abstract void execute(); + public void execute(); } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java similarity index 92% rename from server/src/main/java/io/druid/server/coordinator/DruidCoordinatorBalancer.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 0ca4ff32fe8..bc960258400 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.coordinator; +package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -25,6 +25,14 @@ import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.common.guava.Comparators; import com.metamx.emitter.EmittingLogger; import io.druid.client.DruidServer; +import io.druid.server.coordinator.BalancerSegmentHolder; +import io.druid.server.coordinator.BalancerStrategy; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCoordinator; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.server.coordinator.LoadPeonCallback; +import io.druid.server.coordinator.LoadQueuePeon; +import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -163,7 +171,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper callback = new LoadPeonCallback() { @Override - protected void execute() + public void execute() { Map movingSegments = currentlyMovingSegments.get(toServer.getTier()); if (movingSegments != null) { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorCleanup.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java similarity index 89% rename from server/src/main/java/io/druid/server/coordinator/DruidCoordinatorCleanup.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java index 659391514aa..33438e204e1 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorCleanup.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.coordinator; +package io.druid.server.coordinator.helper; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; @@ -25,6 +25,14 @@ import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCluster; +import io.druid.server.coordinator.DruidCoordinator; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.server.coordinator.LoadPeonCallback; +import io.druid.server.coordinator.LoadQueuePeon; +import io.druid.server.coordinator.ServerHolder; +import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -69,7 +77,7 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper segment, new LoadPeonCallback() { @Override - protected void execute() + public void execute() { } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorHelper.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHelper.java similarity index 89% rename from server/src/main/java/io/druid/server/coordinator/DruidCoordinatorHelper.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHelper.java index e6eb7bb3997..26709744ab9 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorHelper.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHelper.java @@ -17,7 +17,9 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.coordinator; +package io.druid.server.coordinator.helper; + +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; /** */ diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorLogger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java similarity index 96% rename from server/src/main/java/io/druid/server/coordinator/DruidCoordinatorLogger.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java index b6c8bb8c240..aa03e808f17 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorLogger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.coordinator; +package io.druid.server.coordinator.helper; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; @@ -27,6 +27,11 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.collections.CountingMap; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCluster; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.server.coordinator.LoadQueuePeon; +import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; import java.util.Map; diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuleRunner.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java similarity index 92% rename from server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuleRunner.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 594889201cf..15edf27d428 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -17,10 +17,15 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.coordinator; +package io.druid.server.coordinator.helper; import com.metamx.emitter.EmittingLogger; import io.druid.db.DatabaseRuleManager; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCluster; +import io.druid.server.coordinator.DruidCoordinator; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.rules.Rule; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorSegmentInfoLoader.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java similarity index 91% rename from server/src/main/java/io/druid/server/coordinator/DruidCoordinatorSegmentInfoLoader.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java index 69f8ccdbc43..8980bfded90 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorSegmentInfoLoader.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java @@ -17,9 +17,11 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.coordinator; +package io.druid.server.coordinator.helper; import com.metamx.common.logger.Logger; +import io.druid.server.coordinator.DruidCoordinator; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.timeline.DataSegment; import java.util.Set; diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorSegmentMerger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java similarity index 98% rename from server/src/main/java/io/druid/server/coordinator/DruidCoordinatorSegmentMerger.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java index 839cda93563..d39d1bbaac9 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorSegmentMerger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.coordinator; +package io.druid.server.coordinator.helper; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -33,6 +33,9 @@ import com.metamx.common.Pair; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DatasourceWhitelist; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java new file mode 100644 index 00000000000..70bef7c2216 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java @@ -0,0 +1,66 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.coordinator.helper; + +import com.metamx.emitter.EmittingLogger; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.segment.IndexIO; +import io.druid.server.coordinator.DatasourceWhitelist; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.timeline.DataSegment; + +import java.util.concurrent.atomic.AtomicReference; + +public class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper +{ + private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorVersionConverter.class); + + + private final IndexingServiceClient indexingServiceClient; + private final AtomicReference whitelistRef; + + public DruidCoordinatorVersionConverter( + IndexingServiceClient indexingServiceClient, + AtomicReference whitelistRef + ) + { + this.indexingServiceClient = indexingServiceClient; + this.whitelistRef = whitelistRef; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + DatasourceWhitelist whitelist = whitelistRef.get(); + + for (DataSegment dataSegment : params.getAvailableSegments()) { + if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) { + final Integer binaryVersion = dataSegment.getBinaryVersion(); + + if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) { + log.info("Upgrading version on segment[%s]", dataSegment.getIdentifier()); + indexingServiceClient.upgradeSegment(dataSegment); + } + } + } + + return params; + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java b/server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java new file mode 100644 index 00000000000..fe86a602055 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java @@ -0,0 +1,40 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.coordinator.rules; + +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; + +/** + */ +public class ForeverDropRule extends DropRule +{ + @Override + public String getType() + { + return "dropForever"; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java new file mode 100644 index 00000000000..d826c572ad0 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java @@ -0,0 +1,73 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.coordinator.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; + +/** + */ +public class ForeverLoadRule extends LoadRule +{ + private final Integer replicants; + private final String tier; + + @JsonCreator + public ForeverLoadRule( + @JsonProperty("replicants") Integer replicants, + @JsonProperty("tier") String tier + ) + { + this.replicants = (replicants == null) ? 2 : replicants; + this.tier = tier; + } + + @Override + public int getReplicants() + { + return replicants; + } + + @Override + public int getReplicants(String tier) + { + return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0; + } + + @Override + public String getTier() + { + return null; + } + + @Override + public String getType() + { + return "loadForever"; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 792d76d01b2..98ee7cfb206 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -120,7 +120,7 @@ public abstract class LoadRule implements Rule new LoadPeonCallback() { @Override - protected void execute() + public void execute() { replicationManager.unregisterReplicantCreation( getTier(), @@ -197,7 +197,7 @@ public abstract class LoadRule implements Rule new LoadPeonCallback() { @Override - protected void execute() + public void execute() { replicationManager.unregisterReplicantTermination( getTier(), diff --git a/server/src/main/java/io/druid/server/coordinator/rules/Rule.java b/server/src/main/java/io/druid/server/coordinator/rules/Rule.java index 7c720e1aa2f..e8be9a81102 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/Rule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/Rule.java @@ -33,6 +33,7 @@ import org.joda.time.DateTime; @JsonSubTypes(value = { @JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class), @JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class), + @JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class), @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class), @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class), @JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class), diff --git a/server/src/main/java/io/druid/server/http/CoordinatorResource.java b/server/src/main/java/io/druid/server/http/CoordinatorResource.java index 1c70dd39c8c..5946a57d385 100644 --- a/server/src/main/java/io/druid/server/http/CoordinatorResource.java +++ b/server/src/main/java/io/druid/server/http/CoordinatorResource.java @@ -61,7 +61,7 @@ public class CoordinatorResource new LoadPeonCallback() { @Override - protected void execute() + public void execute() { return; } @@ -91,7 +91,7 @@ public class CoordinatorResource segmentToDrop.getFromServer(), segmentToDrop.getSegmentName(), new LoadPeonCallback() { @Override - protected void execute() + public void execute() { return; } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java index 2d95fec765b..6f931c286ff 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java @@ -29,6 +29,7 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DruidServer; import io.druid.db.DatabaseRuleManager; +import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.rules.PeriodLoadRule; import io.druid.server.coordinator.rules.Rule; import io.druid.timeline.DataSegment; diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java index 77808f2fe86..06af063897d 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java @@ -20,6 +20,7 @@ package io.druid.server.coordinator; import io.druid.client.DruidServer; +import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; import io.druid.timeline.DataSegment; public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer @@ -59,7 +60,7 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer loadPeon.loadSegment(segment.getSegment(), new LoadPeonCallback() { @Override - protected void execute() + public void execute() { } }); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 9c8fdf0de2a..72693e65103 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -31,6 +31,7 @@ import com.metamx.emitter.service.ServiceEventBuilder; import io.druid.client.DruidServer; import io.druid.db.DatabaseRuleManager; import io.druid.segment.IndexIO; +import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.IntervalDropRule; import io.druid.server.coordinator.rules.Rule; diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java index 72aeb40ab46..b6cc82cd354 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; import junit.framework.Assert;