refactor master to run rules before cleaning up; more master stats; general improvements

This commit is contained in:
Fangjin Yang 2012-12-03 14:43:04 -08:00
parent 91cd9e9536
commit 5822f4f5f7
34 changed files with 886 additions and 1181 deletions

View File

@ -58,7 +58,7 @@ public class DruidServer implements Comparable
config.getHost(),
config.getMaxSize(),
type,
config.getSubType()
config.getTier()
);
}

View File

@ -35,6 +35,6 @@ public abstract class DruidServerConfig
@Config("druid.server.maxSize")
public abstract long getMaxSize();
@Config("druid.server.subType")
public abstract String getSubType();
@Config("druid.server.tier")
public abstract String getTier();
}

View File

@ -35,26 +35,26 @@ public class DbConnector
{
private static final Logger log = new Logger(DbConnector.class);
public static void createSegmentTable(final DBI dbi, final DbConnectorConfig config)
public static void createSegmentTable(final DBI dbi, final String segmentTableName)
{
createTable(
dbi,
config.getSegmentTable(),
segmentTableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
config.getSegmentTable()
segmentTableName
)
);
}
public static void createRuleTable(final DBI dbi, final DbConnectorConfig config)
public static void createRuleTable(final DBI dbi, final String ruleTableName)
{
createTable(
dbi,
config.getRuleTable(),
ruleTableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, ruleVersion TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
config.getRuleTable()
ruleTableName
)
);
}

View File

@ -41,8 +41,4 @@ public abstract class DbConnectorConfig
@JsonProperty("segmentTable")
@Config("druid.database.segmentTable")
public abstract String getSegmentTable();
@JsonProperty("ruleTable")
@Config("druid.database.ruleTable")
public abstract String getRuleTable();
}

View File

@ -38,9 +38,6 @@ public class DbUpdaterJobSpec extends DbConnectorConfig implements UpdaterJobSpe
@JsonProperty("segmentTable")
public String segmentTable;
@JsonProperty("ruleTable")
public String ruleTable;
@Override
public String getDatabaseConnectURI()
{
@ -59,15 +56,8 @@ public class DbUpdaterJobSpec extends DbConnectorConfig implements UpdaterJobSpe
return password;
}
@Override
public String getSegmentTable()
{
return segmentTable;
}
@Override
public String getRuleTable()
{
return ruleTable;
}
}

View File

@ -61,14 +61,14 @@ public class DatabaseRuleCoordinator
public RuleMap getRuleMap()
{
Map<String, List<Rule>> assignmentRules = getRules();
Map<String, List<Rule>> assignmentRules = getAllRules();
return new RuleMap(
assignmentRules,
assignmentRules.get(config.getDefaultDatasource())
);
}
public Map<String, List<Rule>> getRules()
public Map<String, List<Rule>> getAllRules()
{
synchronized (lock) {
return dbi.withHandle(

View File

@ -272,7 +272,7 @@ public class InfoResource
public Response getRules()
{
return Response.status(Response.Status.OK)
.entity(databaseRuleCoordinator.getRules())
.entity(databaseRuleCoordinator.getAllRules())
.build();
}

View File

@ -117,8 +117,8 @@ public class MasterMain
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
final DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
DbConnector.createSegmentTable(dbi, dbConnectorConfig);
DbConnector.createRuleTable(dbi, dbConnectorConfig);
DbConnector.createSegmentTable(dbi, PropUtils.getProperty(props, "druid.database.segmentTable"));
DbConnector.createRuleTable(dbi, PropUtils.getProperty(props, "druid.database.ruleTable"));
final DatabaseSegmentManager databaseSegmentManager = new DatabaseSegmentManager(
jsonMapper,
scheduledExecutorFactory.create(1, "DatabaseSegmentManager-Exec--%d"),

View File

@ -80,7 +80,7 @@ public class DruidCluster
return cluster.isEmpty();
}
public boolean isEmpty(String tier)
public boolean hasTier(String tier)
{
MinMaxPriorityQueue<ServerHolder> servers = cluster.get(tier);
return (servers == null) || servers.isEmpty();

View File

@ -20,6 +20,7 @@
package com.metamx.druid.master;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -50,6 +51,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -602,12 +604,8 @@ public class DruidMaster
// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
final Set<String> historicalServers = Sets.newHashSet();
for (DruidServer server : servers) {
if (server.getType().equalsIgnoreCase("historical")) {
historicalServers.add(server.getName());
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = yp.combineParts(Arrays.asList(config.getLoadQueuePath(), server.getName()));
LoadQueuePeon loadQueuePeon = new LoadQueuePeon(yp, basePath, peonExec);
@ -624,14 +622,24 @@ public class DruidMaster
}
}
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(
params.getAvailableSegments(),
databaseRuleCoordinator.getRuleMap()
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
// Stop peons for servers that aren't there anymore.
for (String name : Sets.difference(historicalServers, loadManagementPeons.keySet())) {
for (String name : Sets.difference(
Sets.newHashSet(
Collections2.transform(
servers,
new Function<DruidServer, String>()
{
@Override
public String apply(@Nullable DruidServer input)
{
return input.getName();
}
}
)
), loadManagementPeons.keySet()
)) {
log.info("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();
@ -639,15 +647,17 @@ public class DruidMaster
yp.unregisterListener(yp.combineParts(Arrays.asList(config.getLoadQueuePath(), name)), peon);
}
decrementRemovedSegmentsLifetime();
return params.buildFromExisting()
.withDruidCluster(cluster)
.withSegmentRuleLookup(segmentRuleLookup)
.withRuleMap(databaseRuleCoordinator.getRuleMap())
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
}
},
new DruidMasterAssigner(DruidMaster.this),
new DruidMasterDropper(DruidMaster.this),
new DruidMasterRuleRunner(DruidMaster.this),
new DruidMasterCleanup(DruidMaster.this),
new DruidMasterBalancer(DruidMaster.this, new BalancerAnalyzer()),
new DruidMasterLogger()
)
@ -668,12 +678,13 @@ public class DruidMaster
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
log.info("Issued merge requests for %,d segments", params.getMergedSegmentCount());
MasterStats stats = params.getMasterStats();
log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get());
params.getEmitter().emit(
new ServiceMetricEvent.Builder().build(
"master/merge/count",
params.getMergedSegmentCount()
stats.getGlobalStats().get("mergedCount")
)
);

View File

@ -1,92 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.stats.AssignStat;
import com.metamx.emitter.EmittingLogger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class DruidMasterAssigner implements DruidMasterHelper
{
private static final EmittingLogger log = new EmittingLogger(DruidMasterAssigner.class);
private final DruidMaster master;
public DruidMasterAssigner(
DruidMaster master
)
{
this.master = master;
}
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
int unassignedCount = 0;
long unassignedSize = 0;
CountingMap<String> assignedCounts = new CountingMap<String>();
DruidCluster cluster = params.getDruidCluster();
if (cluster.isEmpty()) {
log.warn("Uh... I have no servers. Not assigning anything...");
return params;
}
// Assign unserviced segments to servers in order of most available space
for (DataSegment segment : params.getAvailableSegments()) {
Rule rule = params.getSegmentRuleLookup().lookup(segment.getIdentifier());
AssignStat stat = rule.runAssign(params, segment);
unassignedCount += stat.getUnassignedCount();
unassignedSize += stat.getUnassignedSize();
if (stat.getAssignedCount() != null) {
assignedCounts.add(stat.getAssignedCount().lhs, stat.getAssignedCount().rhs);
}
}
master.decrementRemovedSegmentsLifetime();
List<String> assignmentMsgs = Lists.newArrayList();
for (Map.Entry<String, AtomicLong> entry : assignedCounts.entrySet()) {
if (cluster.get(entry.getKey()) != null) {
assignmentMsgs.add(
String.format(
"[%s] : Assigned %s segments among %,d servers",
entry.getKey(), assignedCounts.get(entry.getKey()), cluster.get(entry.getKey()).size()
)
);
}
}
return params.buildFromExisting()
.withMessages(assignmentMsgs)
.withAssignedCount(assignedCounts)
.withUnassignedCount(unassignedCount)
.withUnassignedSize(unassignedSize)
.build();
}
}

View File

@ -85,7 +85,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
Map<String, Integer> movedCounts = Maps.newHashMap();
MasterStats stats = new MasterStats();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
params.getDruidCluster().getCluster().entrySet()) {
@ -149,22 +149,12 @@ public class DruidMasterBalancer implements DruidMasterHelper
analyzer.findSegmentsToMove(highestPercentUsedServer.getServer()),
params
);
}
List<String> moveMsgs = Lists.newArrayList();
for (Map.Entry<String, ConcurrentHashMap<String, BalancerSegmentHolder>> entry : currentlyMovingSegments.entrySet()) {
movedCounts.put(entry.getKey(), entry.getValue().size());
moveMsgs.add(
String.format(
"[%s] : Moved %,d segment(s)",
entry.getKey(), entry.getValue().size()
)
);
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
}
return params.buildFromExisting()
.withMessages(moveMsgs)
.withMovedCount(movedCounts)
.withMasterStats(stats)
.build();
}

View File

@ -19,7 +19,6 @@
package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.guava.Comparators;
@ -29,24 +28,19 @@ import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.stats.DropStat;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class DruidMasterDropper implements DruidMasterHelper
public class DruidMasterCleanup implements DruidMasterHelper
{
private static final Logger log = new Logger(DruidMasterDropper.class);
private static final Logger log = new Logger(DruidMasterCleanup.class);
private final DruidMaster master;
public DruidMasterDropper(
public DruidMasterCleanup(
DruidMaster master
)
{
@ -56,13 +50,11 @@ public class DruidMasterDropper implements DruidMasterHelper
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
final CountingMap<String> droppedCounts = new CountingMap<String>();
int deletedCount = 0;
MasterStats stats = new MasterStats();
Set<DataSegment> availableSegments = params.getAvailableSegments();
DruidCluster cluster = params.getDruidCluster();
// Drop segments that are not needed
// Drop segments that no longer exist in the available segments configuration
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
@ -82,7 +74,7 @@ public class DruidMasterDropper implements DruidMasterHelper
}
}
);
droppedCounts.add(server.getTier(), 1);
stats.addToTieredStat("unneededCount", server.getTier(), 1);
}
}
}
@ -92,7 +84,7 @@ public class DruidMasterDropper implements DruidMasterHelper
// Delete segments that are old
// Unservice old partitions if we've had enough time to make sure we aren't flapping with old data
if (System.currentTimeMillis() - params.getStartTime() > params.getMillisToWaitBeforeDeleting()) {
if (params.hasDeletionWaitTimeElapsed()) {
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = Maps.newHashMap();
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
@ -118,39 +110,25 @@ public class DruidMasterDropper implements DruidMasterHelper
for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
for (DataSegment dataSegment : holder.getObject().payloads()) {
log.info("Deleting[%s].", dataSegment);
master.removeSegment(dataSegment);
++deletedCount;
stats.addToGlobalStat("overShadowedCount", 1);
}
}
}
for (DataSegment segment : availableSegments) {
Rule rule = params.getSegmentRuleLookup().lookup(segment.getIdentifier());
DropStat stat = rule.runDrop(master, params, segment);
deletedCount += stat.getDeletedCount();
if (stat.getDroppedCount() != null) {
droppedCounts.putAll(stat.getDroppedCount());
}
}
}
List<String> dropMsgs = Lists.newArrayList();
for (Map.Entry<String, AtomicLong> entry : droppedCounts.entrySet()) {
dropMsgs.add(
String.format(
"[%s] : Dropped %s segments among %,d servers",
entry.getKey(), droppedCounts.get(entry.getKey()), cluster.get(entry.getKey()).size()
)
);
}
//List<String> removedMsgs = Lists.newArrayList();
//for (Map.Entry<String, AtomicLong> entry : unneededSegments.entrySet()) {
// removedMsgs.add(
// String.format(
// "[%s] : Removed %s unneeded segments among %,d servers",
// entry.getKey(), entry.getValue(), cluster.get(entry.getKey()).size()
// )
// );
//}
return params.buildFromExisting()
.withMessages(dropMsgs)
.withMessage(String.format("Deleted %,d segments", deletedCount))
.withDroppedCount(droppedCounts)
.withDeletedCount(deletedCount)
.withMasterStats(stats)
.build();
}
}

View File

@ -30,6 +30,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -40,12 +41,86 @@ public class DruidMasterLogger implements DruidMasterHelper
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
for (String msg : params.getMessages()) {
log.info(msg);
DruidCluster cluster = params.getDruidCluster();
MasterStats stats = params.getMasterStats();
ServiceEmitter emitter = params.getEmitter();
Map<String, AtomicLong> assigned = stats.getPerTierStats().get("assignedCount");
if (assigned != null) {
for (Map.Entry<String, AtomicLong> entry : assigned.entrySet()) {
log.info(
"[%s] : Assigned %s segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size()
);
}
}
Map<String, AtomicLong> unassigned = stats.getPerTierStats().get("unassignedCount");
if (unassigned != null) {
for (Map.Entry<String, AtomicLong> entry : unassigned.entrySet()) {
emitter.emit(
new ServiceMetricEvent.Builder().build(
String.format("master/%s/unassigned/count", entry.getKey()),
entry.getValue().get()
)
);
}
}
Map<String, AtomicLong> sizes = stats.getPerTierStats().get("unassignedSize");
if (sizes != null) {
for (Map.Entry<String, AtomicLong> entry : sizes.entrySet()) {
emitter.emit(
new ServiceMetricEvent.Builder().build(
String.format("master/%s/unassigned/size", entry.getKey()),
entry.getValue().get()
)
);
}
}
Map<String, AtomicLong> dropped = stats.getPerTierStats().get("droppedCount");
if (dropped != null) {
for (Map.Entry<String, AtomicLong> entry : dropped.entrySet()) {
log.info(
"[%s] : Dropped %s segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size()
);
}
}
emitter.emit(
new ServiceMetricEvent.Builder().build(
"master/deleted/count", stats.getGlobalStats().get("deletedCount")
)
);
Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount");
if (unneeded != null) {
for (Map.Entry<String, AtomicLong> entry : unneeded.entrySet()) {
log.info(
"[%s] : Removed %s unneeded segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size()
);
}
}
emitter.emit(
new ServiceMetricEvent.Builder().build(
"master/overShadowed/count", stats.getGlobalStats().get("overShadowedCount")
)
);
Map<String, AtomicLong> moved = stats.getPerTierStats().get("movedCount");
if (moved != null) {
for (Map.Entry<String, AtomicLong> entry : moved.entrySet()) {
log.info(
"[%s] : Moved %,d segment(s)",
entry.getKey(), entry.getValue().get()
);
}
}
log.info("Load Queues:");
DruidCluster cluster = params.getDruidCluster();
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
@ -71,8 +146,6 @@ public class DruidMasterLogger implements DruidMasterHelper
}
}
final ServiceEmitter emitter = params.getEmitter();
// Emit master metrics
final Set<Map.Entry<String, LoadQueuePeon>> peonEntries = params.getLoadManagementPeons().entrySet();
for (Map.Entry<String, LoadQueuePeon> entry : peonEntries) {
@ -97,8 +170,6 @@ public class DruidMasterLogger implements DruidMasterHelper
)
);
}
emitter.emit(new ServiceMetricEvent.Builder().build("master/unassigned/count", params.getUnassignedCount()));
emitter.emit(new ServiceMetricEvent.Builder().build("master/unassigned/size", params.getUnassignedSize()));
// Emit segment metrics
CountingMap<String> segmentSizes = new CountingMap<String>();

View File

@ -0,0 +1,81 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
import java.util.List;
/**
*/
public class DruidMasterRuleRunner implements DruidMasterHelper
{
private static final Logger log = new Logger(DruidMasterRuleRunner.class);
private final DruidMaster master;
public DruidMasterRuleRunner(DruidMaster master)
{
this.master = master;
}
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
MasterStats stats = new MasterStats();
DruidCluster cluster = params.getDruidCluster();
if (cluster.isEmpty()) {
log.warn("Uh... I have no servers. Not assigning anything...");
return params;
}
// Run through all matched rules for available segments
RuleMap ruleMap = params.getRuleMap();
for (DataSegment segment : params.getAvailableSegments()) {
List<Rule> rules = ruleMap.getRules(segment.getDataSource());
if (rules == null) {
throw new ISE("No rules found for segment[%s]", segment.getIdentifier());
}
boolean foundMatchingRule = false;
for (Rule rule : rules) {
if (rule.appliesTo(segment)) {
stats.accumulate(rule.run(master, params, segment));
foundMatchingRule = true;
break;
}
}
if (!foundMatchingRule) {
throw new ISE("Unable to find a matching rule for segment[%s]", segment.getIdentifier());
}
}
return params.buildFromExisting()
.withMasterStats(stats)
.build();
}
}

View File

@ -19,18 +19,17 @@
package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.db.DatabaseRuleCoordinator;
import com.metamx.druid.master.rules.RuleMap;
import com.metamx.emitter.service.ServiceEmitter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -40,65 +39,44 @@ public class DruidMasterRuntimeParams
{
private final long startTime;
private final DruidCluster druidCluster;
private final SegmentRuleLookup segmentRuleLookup;
private final RuleMap ruleMap;
private final SegmentReplicantLookup segmentReplicantLookup;
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final ServiceEmitter emitter;
private final long millisToWaitBeforeDeleting;
private final List<String> messages;
private final CountingMap<String> assignedCount;
private final CountingMap<String> droppedCount;
private final int deletedCount;
private final int unassignedCount;
private final long unassignedSize;
private final Map<String, Integer> movedCount;
private final MasterStats stats;
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int mergedSegmentCount;
public DruidMasterRuntimeParams(
long startTime,
DruidCluster druidCluster,
SegmentRuleLookup segmentRuleLookup,
RuleMap ruleMap,
SegmentReplicantLookup segmentReplicantLookup,
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
ServiceEmitter emitter,
long millisToWaitBeforeDeleting,
List<String> messages,
CountingMap<String> assignedCount,
CountingMap<String> droppedCount,
int deletedCount,
int unassignedCount,
long unassignedSize,
Map<String, Integer> movedCount,
MasterStats stats,
long mergeBytesLimit,
int mergeSegmentsLimit,
int mergedSegmentCount
int mergeSegmentsLimit
)
{
this.startTime = startTime;
this.druidCluster = druidCluster;
this.segmentRuleLookup = segmentRuleLookup;
this.ruleMap = ruleMap;
this.segmentReplicantLookup = segmentReplicantLookup;
this.dataSources = dataSources;
this.availableSegments = availableSegments;
this.loadManagementPeons = loadManagementPeons;
this.emitter = emitter;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.messages = messages;
this.assignedCount = assignedCount;
this.droppedCount = droppedCount;
this.deletedCount = deletedCount;
this.unassignedCount = unassignedCount;
this.unassignedSize = unassignedSize;
this.movedCount = movedCount;
this.stats = stats;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.mergedSegmentCount = mergedSegmentCount;
}
public long getStartTime()
@ -111,9 +89,9 @@ public class DruidMasterRuntimeParams
return druidCluster;
}
public SegmentRuleLookup getSegmentRuleLookup()
public RuleMap getRuleMap()
{
return segmentRuleLookup;
return ruleMap;
}
public SegmentReplicantLookup getSegmentReplicantLookup()
@ -146,39 +124,9 @@ public class DruidMasterRuntimeParams
return millisToWaitBeforeDeleting;
}
public List<String> getMessages()
public MasterStats getMasterStats()
{
return messages;
}
public CountingMap<String> getAssignedCount()
{
return assignedCount;
}
public CountingMap<String> getDroppedCount()
{
return droppedCount;
}
public int getDeletedCount()
{
return deletedCount;
}
public int getUnassignedCount()
{
return unassignedCount;
}
public long getUnassignedSize()
{
return unassignedSize;
}
public Map<String, Integer> getMovedCount()
{
return movedCount;
return stats;
}
public long getMergeBytesLimit()
@ -191,9 +139,9 @@ public class DruidMasterRuntimeParams
return mergeSegmentsLimit;
}
public int getMergedSegmentCount()
public boolean hasDeletionWaitTimeElapsed()
{
return mergedSegmentCount;
return (System.currentTimeMillis() - getStartTime() > getMillisToWaitBeforeDeleting());
}
public static Builder newBuilder()
@ -206,23 +154,16 @@ public class DruidMasterRuntimeParams
return new Builder(
startTime,
druidCluster,
segmentRuleLookup,
ruleMap,
segmentReplicantLookup,
dataSources,
availableSegments,
loadManagementPeons,
messages,
emitter,
millisToWaitBeforeDeleting,
assignedCount,
droppedCount,
deletedCount,
unassignedCount,
unassignedSize,
movedCount,
stats,
mergeBytesLimit,
mergeSegmentsLimit,
mergedSegmentCount
mergeSegmentsLimit
);
}
@ -230,88 +171,60 @@ public class DruidMasterRuntimeParams
{
private long startTime;
private DruidCluster druidCluster;
private SegmentRuleLookup segmentRuleLookup;
private RuleMap ruleMap;
private SegmentReplicantLookup segmentReplicantLookup;
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final List<String> messages;
private long millisToWaitBeforeDeleting;
private ServiceEmitter emitter;
private CountingMap<String> assignedCount;
private CountingMap<String> droppedCount;
private int deletedCount;
private int unassignedCount;
private long unassignedSize;
private Map<String, Integer> movedCount;
private long millisToWaitBeforeDeleting;
private MasterStats stats;
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int mergedSegmentCount;
Builder()
{
this.startTime = 0;
this.druidCluster = null;
this.segmentRuleLookup = null;
this.ruleMap = null;
this.segmentReplicantLookup = null;
this.dataSources = Sets.newHashSet();
this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
this.loadManagementPeons = Maps.newHashMap();
this.messages = Lists.newArrayList();
this.emitter = null;
this.millisToWaitBeforeDeleting = 0;
this.assignedCount = new CountingMap<String>();
this.droppedCount = new CountingMap<String>();
this.deletedCount = 0;
this.unassignedCount = 0;
this.unassignedSize = 0;
this.movedCount = Maps.newHashMap();
this.stats = new MasterStats();
this.mergeBytesLimit = 0;
this.mergeSegmentsLimit = 0;
this.mergedSegmentCount = 0;
}
Builder(
long startTime,
DruidCluster cluster,
SegmentRuleLookup segmentRuleLookup,
RuleMap ruleMap,
SegmentReplicantLookup segmentReplicantLookup,
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
List<String> messages,
ServiceEmitter emitter,
long millisToWaitBeforeDeleting,
CountingMap<String> assignedCount,
CountingMap<String> droppedCount,
int deletedCount,
int unassignedCount,
long unassignedSize,
Map<String, Integer> movedCount,
MasterStats stats,
long mergeBytesLimit,
int mergeSegmentsLimit,
int mergedSegmentCount
int mergeSegmentsLimit
)
{
this.startTime = startTime;
this.druidCluster = cluster;
this.segmentRuleLookup = segmentRuleLookup;
this.ruleMap = ruleMap;
this.segmentReplicantLookup = segmentReplicantLookup;
this.dataSources = dataSources;
this.availableSegments = availableSegments;
this.loadManagementPeons = loadManagementPeons;
this.messages = messages;
this.emitter = emitter;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.assignedCount = assignedCount;
this.droppedCount = droppedCount;
this.deletedCount = deletedCount;
this.unassignedCount = unassignedCount;
this.unassignedSize = unassignedSize;
this.movedCount = movedCount;
this.stats = stats;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.mergedSegmentCount = mergedSegmentCount;
}
public DruidMasterRuntimeParams build()
@ -319,23 +232,16 @@ public class DruidMasterRuntimeParams
return new DruidMasterRuntimeParams(
startTime,
druidCluster,
segmentRuleLookup,
ruleMap,
segmentReplicantLookup,
dataSources,
availableSegments,
loadManagementPeons,
emitter,
millisToWaitBeforeDeleting,
messages,
assignedCount,
droppedCount,
deletedCount,
unassignedCount,
unassignedSize,
movedCount,
stats,
mergeBytesLimit,
mergeSegmentsLimit,
mergedSegmentCount
mergeSegmentsLimit
);
}
@ -351,9 +257,9 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withSegmentRuleLookup(SegmentRuleLookup segmentRuleLookup)
public Builder withRuleMap(RuleMap ruleMap)
{
this.segmentRuleLookup = segmentRuleLookup;
this.ruleMap = ruleMap;
return this;
}
@ -381,18 +287,6 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withMessage(String message)
{
messages.add(message);
return this;
}
public Builder withMessages(List<String> messagesCollection)
{
messages.addAll(Collections.unmodifiableList(messagesCollection));
return this;
}
public Builder withEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
@ -405,39 +299,9 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withAssignedCount(CountingMap<String> assignedCount)
public Builder withMasterStats(MasterStats stats)
{
this.assignedCount.putAll(assignedCount);
return this;
}
public Builder withDroppedCount(CountingMap<String> droppedCount)
{
this.droppedCount.putAll(droppedCount);
return this;
}
public Builder withDeletedCount(int deletedCount)
{
this.deletedCount = deletedCount;
return this;
}
public Builder withUnassignedCount(int unassignedCount)
{
this.unassignedCount = unassignedCount;
return this;
}
public Builder withUnassignedSize(long unassignedSize)
{
this.unassignedSize = unassignedSize;
return this;
}
public Builder withMovedCount(Map<String, Integer> movedCount)
{
this.movedCount.putAll(movedCount);
this.stats.accumulate(stats);
return this;
}
@ -452,11 +316,5 @@ public class DruidMasterRuntimeParams
this.mergeSegmentsLimit = mergeSegmentsLimit;
return this;
}
public Builder withMergedSegmentCount(int mergedSegmentCount)
{
this.mergedSegmentCount = mergedSegmentCount;
return this;
}
}
}

View File

@ -77,8 +77,8 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
MasterStats stats = new MasterStats();
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = Maps.newHashMap();
int count = 0;
// Find serviced segments by using a timeline
for (DataSegment dataSegment : params.getAvailableSegments()) {
@ -114,7 +114,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.getSegmentCount() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
}
if (segmentsToMerge.getSegmentCount() == 0) {
@ -129,12 +129,12 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
// Finish any timelineObjects to merge that may have not hit threshold
segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.getSegmentCount() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
}
}
return params.buildFromExisting()
.withMergedSegmentCount(count)
.withMasterStats(stats)
.build();
}

View File

@ -0,0 +1,83 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.Maps;
import com.metamx.druid.collect.CountingMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class MasterStats
{
private final Map<String, CountingMap<String>> perTierStats;
private final CountingMap<String> globalStats;
public MasterStats()
{
perTierStats = Maps.newHashMap();
globalStats = new CountingMap<String>();
}
public Map<String, CountingMap<String>> getPerTierStats()
{
return perTierStats;
}
public CountingMap<String> getGlobalStats()
{
return globalStats;
}
public void addToTieredStat(String statName, String tier, long value)
{
CountingMap<String> theStat = perTierStats.get(statName);
if (theStat == null) {
theStat = new CountingMap<String>();
perTierStats.put(statName, theStat);
}
theStat.add(tier, value);
}
public void addToGlobalStat(String statName, long value)
{
globalStats.add(statName, value);
}
public MasterStats accumulate(MasterStats stats)
{
for (Map.Entry<String, CountingMap<String>> entry : stats.perTierStats.entrySet()) {
CountingMap<String> theStat = perTierStats.get(entry.getKey());
if (theStat == null) {
theStat = new CountingMap<String>();
perTierStats.put(entry.getKey(), theStat);
}
for (Map.Entry<String, AtomicLong> tiers : entry.getValue().entrySet()) {
theStat.add(tiers.getKey(), tiers.getValue().get());
}
}
for (Map.Entry<String, AtomicLong> entry : stats.globalStats.entrySet()) {
globalStats.add(entry.getKey(), entry.getValue().get());
}
return this;
}
}

View File

@ -68,17 +68,12 @@ public class SegmentReplicantLookup
private final Table<String, String, Integer> table;
public SegmentReplicantLookup()
{
this.table = HashBasedTable.create();
}
public SegmentReplicantLookup(Table<String, String, Integer> table)
private SegmentReplicantLookup(Table<String, String, Integer> table)
{
this.table = table;
}
public Map<String, Integer> gettiers(String segmentId)
public Map<String, Integer> getTiers(String segmentId)
{
Map<String, Integer> retVal = table.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;

View File

@ -1,75 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
import java.util.Map;
/**
*/
public class SegmentRuleLookup
{
// Create a mapping between a segment and a rule, a segment is paired with the first rule it matches
public static SegmentRuleLookup make(
Iterable<DataSegment> segments,
RuleMap ruleMap
)
{
Map<String, Rule> lookup = Maps.newHashMap();
for (DataSegment segment : segments) {
for (Rule rule : ruleMap.getRules(segment.getDataSource())) {
if (rule.appliesTo(segment)) {
lookup.put(segment.getIdentifier(), rule);
break;
}
}
if (!lookup.containsKey(segment.getIdentifier())) {
throw new ISE("Unable to find a rule for [%s]!!!", segment.getIdentifier());
}
}
return new SegmentRuleLookup(lookup);
}
private final Map<String, Rule> lookup;
public SegmentRuleLookup()
{
this.lookup = Maps.newHashMap();
}
public SegmentRuleLookup(
Map<String, Rule> lookup
)
{
this.lookup = lookup;
}
public Rule lookup(String s)
{
return lookup.get(s);
}
}

View File

@ -22,8 +22,7 @@ package com.metamx.druid.master.rules;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.stats.AssignStat;
import com.metamx.druid.master.stats.DropStat;
import com.metamx.druid.master.MasterStats;
/**
* DropRules indicate when segments should be completely removed from the cluster.
@ -31,15 +30,11 @@ import com.metamx.druid.master.stats.DropStat;
public abstract class DropRule implements Rule
{
@Override
public AssignStat runAssign(DruidMasterRuntimeParams params, DataSegment segment)
{
return new AssignStat(null, 0, 0);
}
@Override
public DropStat runDrop(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment)
public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment)
{
MasterStats stats = new MasterStats();
master.removeSegment(segment);
return new DropStat(null, 1);
stats.addToGlobalStat("deletedCount", 1);
return stats;
}
}

View File

@ -32,18 +32,18 @@ public class IntervalLoadRule extends LoadRule
private static final Logger log = new Logger(IntervalLoadRule.class);
private final Interval interval;
private final Integer replicationFactor;
private final Integer replicants;
private final String tier;
@JsonCreator
public IntervalLoadRule(
@JsonProperty("interval") Interval interval,
@JsonProperty("replicationFactor") Integer replicationFactor,
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.interval = interval;
this.replicationFactor = (replicationFactor == null) ? 2 : replicationFactor; //TODO:config
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
}
@ -56,20 +56,20 @@ public class IntervalLoadRule extends LoadRule
@Override
@JsonProperty
public int getReplicationFactor()
public int getReplicants()
{
return replicationFactor;
return replicants;
}
@Override
public int getReplicationFactor(String tier)
public int getReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicationFactor : 0;
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
@JsonProperty
public String gettier()
public String getTier()
{
return tier;
}

View File

@ -22,17 +22,14 @@ package com.metamx.druid.master.rules;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.Pair;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.MasterStats;
import com.metamx.druid.master.ServerHolder;
import com.metamx.druid.master.stats.AssignStat;
import com.metamx.druid.master.stats.DropStat;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import java.util.List;
import java.util.Map;
@ -45,26 +42,39 @@ public abstract class LoadRule implements Rule
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
@Override
public AssignStat runAssign(DruidMasterRuntimeParams params, DataSegment segment)
public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment)
{
int assignedCount = 0;
int unassignedCount = 0;
long unassignedSize = 0;
MasterStats stats = new MasterStats();
int expectedReplicants = getReplicationFactor();
int actualReplicants = params.getSegmentReplicantLookup().lookup(segment.getIdentifier(), gettier());
int expectedReplicants = getReplicants();
int actualReplicants = params.getSegmentReplicantLookup().lookup(segment.getIdentifier(), getTier());
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(gettier());
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(getTier());
if (serverQueue == null) {
log.makeAlert("No holders found for tier[%s]", gettier()).emit();
return new AssignStat(new Pair<String, Integer>(gettier(), 0), 0, 0);
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", getTier()).emit();
throw new ISE("Tier[%s] has no servers! Check your cluster configuration!", getTier());
}
stats.accumulate(assign(expectedReplicants, actualReplicants, serverQueue, segment));
stats.accumulate(drop(expectedReplicants, actualReplicants, segment, params));
return stats;
}
private MasterStats assign(
int expectedReplicants,
int actualReplicants,
MinMaxPriorityQueue<ServerHolder> serverQueue,
DataSegment segment
)
{
MasterStats stats = new MasterStats();
List<ServerHolder> assignedServers = Lists.newArrayList();
while (actualReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn("Not enough %s servers[%d] to assign segments!!!", gettier(), serverQueue.size());
log.warn("Not enough %s servers[%d] to assign segments!!!", getTier(), serverQueue.size());
break;
}
if (holder.containsSegment(segment)) {
@ -78,19 +88,17 @@ public abstract class LoadRule implements Rule
holder.getAvailableSize(),
segment
);
params.getEmitter().emit(
new AlertEvent.Builder().build(
"Not enough node capacity",
ImmutableMap.<String, Object>builder()
.put("segmentSkipped", segment.toString())
.put("closestNode", holder.getServer().toString())
.put("availableSize", holder.getAvailableSize())
.build()
)
);
log.makeAlert(
"Not enough node capacity",
ImmutableMap.<String, Object>builder()
.put("segmentSkipped", segment.toString())
.put("closestNode", holder.getServer().toString())
.put("availableSize", holder.getAvailableSize())
.build()
).emit();
serverQueue.add(holder);
unassignedCount++;
unassignedSize += segment.getSize();
stats.addToTieredStat("unassignedCount", getTier(), 1);
stats.addToTieredStat("unassignedSize", getTier(), segment.getSize());
break;
}
@ -106,39 +114,43 @@ public abstract class LoadRule implements Rule
);
assignedServers.add(holder);
++assignedCount;
stats.addToTieredStat("assignedCount", getTier(), 1);
++actualReplicants;
}
serverQueue.addAll(assignedServers);
return new AssignStat(new Pair<String, Integer>(gettier(), assignedCount), unassignedCount, unassignedSize);
return stats;
}
@Override
public DropStat runDrop(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment)
private MasterStats drop(
int expectedReplicants,
int actualReplicants,
DataSegment segment,
DruidMasterRuntimeParams params
)
{
CountingMap<String> droppedCounts = new CountingMap<String>();
int expectedNumReplicants = getReplicationFactor();
int actualNumReplicants = params.getSegmentReplicantLookup().lookup(
segment.getIdentifier(),
gettier()
);
MasterStats stats = new MasterStats();
if (actualNumReplicants < expectedNumReplicants) {
return new DropStat(droppedCounts, 0);
if (!params.hasDeletionWaitTimeElapsed()) {
return stats;
}
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().gettiers(segment.getIdentifier());
// Make sure we have enough actual replicants in the cluster before doing anything
if (actualReplicants < expectedReplicants) {
return stats;
}
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getTiers(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : replicantsByType.entrySet()) {
String tier = entry.getKey();
int actualNumReplicantsForType = entry.getValue();
int expectedNumReplicantsForType = getReplicationFactor(tier);
int expectedNumReplicantsForType = getReplicants(tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
return new DropStat(droppedCounts, 0);
return stats;
}
List<ServerHolder> droppedServers = Lists.newArrayList();
@ -161,17 +173,17 @@ public abstract class LoadRule implements Rule
);
droppedServers.add(holder);
--actualNumReplicantsForType;
droppedCounts.add(tier, 1);
stats.addToTieredStat("droppedCount", tier, 1);
}
serverQueue.addAll(droppedServers);
}
return new DropStat(droppedCounts, 0);
return stats;
}
public abstract int getReplicationFactor();
public abstract int getReplicants();
public abstract int getReplicationFactor(String tier);
public abstract int getReplicants(String tier);
public abstract String gettier();
public abstract String getTier();
}

View File

@ -34,18 +34,18 @@ public class PeriodLoadRule extends LoadRule
private static final Logger log = new Logger(PeriodLoadRule.class);
private final Period period;
private final Integer replicationFactor;
private final Integer replicants;
private final String tier;
@JsonCreator
public PeriodLoadRule(
@JsonProperty("period") String period,
@JsonProperty("replicationFactor") Integer replicationFactor,
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.period = (period == null || period.equalsIgnoreCase("all")) ? new Period("P1000Y") : new Period(period);
this.replicationFactor = (replicationFactor == null) ? 2 : replicationFactor; //TODO:config
this.period = (period == null || period.equalsIgnoreCase("all")) ? new Period("P5000Y") : new Period(period);
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
}
@ -63,19 +63,19 @@ public class PeriodLoadRule extends LoadRule
}
@JsonProperty
public int getReplicationFactor()
public int getReplicants()
{
return replicationFactor;
return replicants;
}
@Override
public int getReplicationFactor(String tier)
public int getReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicationFactor : 0;
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@JsonProperty
public String gettier()
public String getTier()
{
return tier;
}

View File

@ -22,8 +22,7 @@ package com.metamx.druid.master.rules;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.stats.AssignStat;
import com.metamx.druid.master.stats.DropStat;
import com.metamx.druid.master.MasterStats;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
@ -43,7 +42,5 @@ public interface Rule
public boolean appliesTo(DataSegment segment);
public AssignStat runAssign(DruidMasterRuntimeParams params, DataSegment segment);
public DropStat runDrop(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment);
public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment);
}

View File

@ -1,53 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master.stats;
import com.metamx.common.Pair;
/**
*/
public class AssignStat
{
private final Pair<String, Integer> assignedCount;
private final int unassignedCount;
private final long unassignedSize;
public AssignStat(Pair<String, Integer> assignedCount, int unassignedCount, long unassignedSize)
{
this.assignedCount = assignedCount;
this.unassignedCount = unassignedCount;
this.unassignedSize = unassignedSize;
}
public Pair<String, Integer> getAssignedCount()
{
return assignedCount;
}
public int getUnassignedCount()
{
return unassignedCount;
}
public long getUnassignedSize()
{
return unassignedSize;
}
}

View File

@ -1,47 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master.stats;
import com.metamx.common.Pair;
import com.metamx.druid.collect.CountingMap;
/**
*/
public class DropStat
{
private final CountingMap<String> droppedCount;
private final int deletedCount;
public DropStat(CountingMap<String> droppedCount, int deletedCount)
{
this.droppedCount = droppedCount;
this.deletedCount = deletedCount;
}
public CountingMap<String> getDroppedCount()
{
return droppedCount;
}
public int getDeletedCount()
{
return deletedCount;
}
}

View File

@ -352,9 +352,6 @@ public class DruidSetup
final DbConnectorConfig config = new DbConnectorConfig()
{
{
}
@Override
public String getDatabaseConnectURI()
{
@ -378,18 +375,12 @@ public class DruidSetup
{
return tableName;
}
@Override
public String getRuleTable()
{
return ruleTableName;
}
};
DbConnector dbConnector = new DbConnector(config);
DbConnector.createSegmentTable(dbConnector.getDBI(), config);
DbConnector.createSegmentTable(dbConnector.getDBI(), tableName);
DbConnector.createRuleTable(dbConnector.getDBI(), ruleTableName);
}
/**

View File

@ -33,8 +33,8 @@ public class ZkSetup
{
public static void main(final String[] args)
{
if (args.length != 6) {
System.out.println("Usage: <java invocation> zkConnect baseZkPath dbConnectionUrl dbUsername:password tableName ruleTableName");
if (args.length != 5) {
System.out.println("Usage: <java invocation> zkConnect baseZkPath dbConnectionUrl dbUsername:password tableName");
System.out.println("This utility is deprecated, see DruidSetup instead.");
System.exit(1);
}
@ -96,16 +96,10 @@ public class ZkSetup
{
return args[4];
}
@Override
public String getRuleTable()
{
return args[5];
}
};
DbConnector dbConnector = new DbConnector(config);
DbConnector.createSegmentTable(dbConnector.getDBI(), config);
DbConnector.createSegmentTable(dbConnector.getDBI(), args[4]);
}
}

View File

@ -108,7 +108,7 @@ jQuery.fn = jQuery.prototype = {
}
// Handle $(DOMElement)
if ( selector.tier ) {
if ( selector.nodeType ) {
this.context = this[0] = selector;
this.length = 1;
return this;
@ -518,7 +518,7 @@ jQuery.extend({
// Must be an Object.
// Because of IE, we also have to check the presence of the constructor property.
// Make sure that DOM nodes and window objects don't pass through, as well
if ( !obj || jQuery.type(obj) !== "object" || obj.tier || jQuery.isWindow( obj ) ) {
if ( !obj || jQuery.type(obj) !== "object" || obj.nodeType || jQuery.isWindow( obj ) ) {
return false;
}
@ -1387,7 +1387,7 @@ jQuery.support = (function() {
support = {
// IE strips leading whitespace when .innerHTML is used
leadingWhitespace: ( div.firstChild.tier === 3 ),
leadingWhitespace: ( div.firstChild.nodeType === 3 ),
// Make sure that tbody elements aren't automatically inserted
// IE will insert them into empty tables
@ -1693,7 +1693,7 @@ jQuery.extend({
},
hasData: function( elem ) {
elem = elem.tier ? jQuery.cache[ elem[jQuery.expando] ] : elem[ jQuery.expando ];
elem = elem.nodeType ? jQuery.cache[ elem[jQuery.expando] ] : elem[ jQuery.expando ];
return !!elem && !isEmptyDataObject( elem );
},
@ -1708,7 +1708,7 @@ jQuery.extend({
// We have to handle DOM nodes and JS objects differently because IE6-7
// can't GC object references properly across the DOM-JS boundary
isNode = elem.tier,
isNode = elem.nodeType,
// Only DOM nodes need the global jQuery cache; JS object data is
// attached directly to the object so GC can occur automatically
@ -1808,7 +1808,7 @@ jQuery.extend({
// Reference to internal data cache key
internalKey = jQuery.expando,
isNode = elem.tier,
isNode = elem.nodeType,
// See jQuery.data for more information
cache = isNode ? jQuery.cache : elem,
@ -1921,7 +1921,7 @@ jQuery.fn.extend({
if ( this.length ) {
data = jQuery.data( this[0] );
if ( this[0].tier === 1 && !jQuery._data( this[0], "parsedAttrs" ) ) {
if ( this[0].nodeType === 1 && !jQuery._data( this[0], "parsedAttrs" ) ) {
attr = this[0].attributes;
for ( var i = 0, l = attr.length; i < l; i++ ) {
name = attr[i].name;
@ -1982,7 +1982,7 @@ jQuery.fn.extend({
function dataAttr( elem, key, data ) {
// If nothing was found internally, try to fetch any
// data from the HTML5 data-* attribute
if ( data === undefined && elem.tier === 1 ) {
if ( data === undefined && elem.nodeType === 1 ) {
var name = "data-" + key.replace( rmultiDash, "-$1" ).toLowerCase();
@ -2255,7 +2255,7 @@ jQuery.fn.extend({
for ( i = 0, l = this.length; i < l; i++ ) {
elem = this[ i ];
if ( elem.tier === 1 ) {
if ( elem.nodeType === 1 ) {
if ( !elem.className && classNames.length === 1 ) {
elem.className = value;
@ -2291,7 +2291,7 @@ jQuery.fn.extend({
for ( i = 0, l = this.length; i < l; i++ ) {
elem = this[ i ];
if ( elem.tier === 1 && elem.className ) {
if ( elem.nodeType === 1 && elem.className ) {
if ( value ) {
className = (" " + elem.className + " ").replace( rclass, " " );
for ( c = 0, cl = classNames.length; c < cl; c++ ) {
@ -2351,7 +2351,7 @@ jQuery.fn.extend({
i = 0,
l = this.length;
for ( ; i < l; i++ ) {
if ( this[i].tier === 1 && (" " + this[i].className + " ").replace(rclass, " ").indexOf( className ) > -1 ) {
if ( this[i].nodeType === 1 && (" " + this[i].className + " ").replace(rclass, " ").indexOf( className ) > -1 ) {
return true;
}
}
@ -2388,7 +2388,7 @@ jQuery.fn.extend({
return this.each(function( i ) {
var self = jQuery(this), val;
if ( this.tier !== 1 ) {
if ( this.nodeType !== 1 ) {
return;
}
@ -2501,7 +2501,7 @@ jQuery.extend({
attr: function( elem, name, value, pass ) {
var ret, hooks, notxml,
nType = elem.tier;
nType = elem.nodeType;
// don't get/set attributes on text, comment and attribute nodes
if ( !elem || nType === 3 || nType === 8 || nType === 2 ) {
@ -2558,7 +2558,7 @@ jQuery.extend({
var propName, attrNames, name, l,
i = 0;
if ( elem.tier === 1 ) {
if ( elem.nodeType === 1 ) {
attrNames = ( value || "" ).split( rspace );
l = attrNames.length;
@ -2635,7 +2635,7 @@ jQuery.extend({
prop: function( elem, name, value ) {
var ret, hooks, notxml,
nType = elem.tier;
nType = elem.nodeType;
// don't get/set properties on text, comment and attribute nodes
if ( !elem || nType === 3 || nType === 8 || nType === 2 ) {
@ -2896,7 +2896,7 @@ jQuery.event = {
handleObjIn, quick, handlers, special;
// Don't attach events to noData or text/comment nodes (allow plain objects tho)
if ( elem.tier === 3 || elem.tier === 8 || !types || !handler || !(elemData = jQuery._data( elem )) ) {
if ( elem.nodeType === 3 || elem.nodeType === 8 || !types || !handler || !(elemData = jQuery._data( elem )) ) {
return;
}
@ -3101,7 +3101,7 @@ jQuery.event = {
trigger: function( event, data, elem, onlyHandlers ) {
// Don't do events on text and comment nodes
if ( elem && (elem.tier === 3 || elem.tier === 8) ) {
if ( elem && (elem.nodeType === 3 || elem.nodeType === 8) ) {
return;
}
@ -3407,7 +3407,7 @@ jQuery.event = {
}
// Target should not be a text node (#504, Safari)
if ( event.target.tier === 3 ) {
if ( event.target.nodeType === 3 ) {
event.target = event.target.parentNode;
}
@ -3944,7 +3944,7 @@ var Sizzle = function( selector, context, results, seed ) {
var origContext = context;
if ( context.tier !== 1 && context.tier !== 9 ) {
if ( context.nodeType !== 1 && context.nodeType !== 9 ) {
return [];
}
@ -3999,7 +3999,7 @@ var Sizzle = function( selector, context, results, seed ) {
} else {
// Take a shortcut and set the context if the root selector is an ID
// (but not if it'll be faster if the inner selector is an ID)
if ( !seed && parts.length > 1 && context.tier === 9 && !contextXML &&
if ( !seed && parts.length > 1 && context.nodeType === 9 && !contextXML &&
Expr.match.ID.test(parts[0]) && !Expr.match.ID.test(parts[parts.length - 1]) ) {
ret = Sizzle.find( parts.shift(), context, contextXML );
@ -4058,16 +4058,16 @@ var Sizzle = function( selector, context, results, seed ) {
if ( !prune ) {
results.push.apply( results, checkSet );
} else if ( context && context.tier === 1 ) {
} else if ( context && context.nodeType === 1 ) {
for ( i = 0; checkSet[i] != null; i++ ) {
if ( checkSet[i] && (checkSet[i] === true || checkSet[i].tier === 1 && Sizzle.contains(context, checkSet[i])) ) {
if ( checkSet[i] && (checkSet[i] === true || checkSet[i].nodeType === 1 && Sizzle.contains(context, checkSet[i])) ) {
results.push( set[i] );
}
}
} else {
for ( i = 0; checkSet[i] != null; i++ ) {
if ( checkSet[i] && checkSet[i].tier === 1 ) {
if ( checkSet[i] && checkSet[i].nodeType === 1 ) {
results.push( set[i] );
}
}
@ -4247,11 +4247,11 @@ Sizzle.error = function( msg ) {
*/
var getText = Sizzle.getText = function( elem ) {
var i, node,
tier = elem.tier,
nodeType = elem.nodeType,
ret = "";
if ( tier ) {
if ( tier === 1 ) {
if ( nodeType ) {
if ( nodeType === 1 ) {
// Use textContent || innerText for elements
if ( typeof elem.textContent === 'string' ) {
return elem.textContent;
@ -4264,15 +4264,15 @@ var getText = Sizzle.getText = function( elem ) {
ret += getText( elem );
}
}
} else if ( tier === 3 || tier === 4 ) {
} else if ( nodeType === 3 || nodeType === 4 ) {
return elem.nodeValue;
}
} else {
// If no tier, this is expected to be an array
// If no nodeType, this is expected to be an array
for ( i = 0; (node = elem[i]); i++ ) {
// Do not traverse comment nodes
if ( node.tier !== 8 ) {
if ( node.nodeType !== 8 ) {
ret += getText( node );
}
}
@ -4322,7 +4322,7 @@ var Expr = Sizzle.selectors = {
for ( var i = 0, l = checkSet.length, elem; i < l; i++ ) {
if ( (elem = checkSet[i]) ) {
while ( (elem = elem.previousSibling) && elem.tier !== 1 ) {}
while ( (elem = elem.previousSibling) && elem.nodeType !== 1 ) {}
checkSet[i] = isPartStrNotTag || elem && elem.nodeName.toLowerCase() === part ?
elem || false :
@ -4696,7 +4696,7 @@ var Expr = Sizzle.selectors = {
case "only":
case "first":
while ( (node = node.previousSibling) ) {
if ( node.tier === 1 ) {
if ( node.nodeType === 1 ) {
return false;
}
}
@ -4709,7 +4709,7 @@ var Expr = Sizzle.selectors = {
case "last":
while ( (node = node.nextSibling) ) {
if ( node.tier === 1 ) {
if ( node.nodeType === 1 ) {
return false;
}
}
@ -4731,7 +4731,7 @@ var Expr = Sizzle.selectors = {
count = 0;
for ( node = parent.firstChild; node; node = node.nextSibling ) {
if ( node.tier === 1 ) {
if ( node.nodeType === 1 ) {
node.nodeIndex = ++count;
}
}
@ -4751,11 +4751,11 @@ var Expr = Sizzle.selectors = {
},
ID: function( elem, match ) {
return elem.tier === 1 && elem.getAttribute("id") === match;
return elem.nodeType === 1 && elem.getAttribute("id") === match;
},
TAG: function( elem, match ) {
return (match === "*" && elem.tier === 1) || !!elem.nodeName && elem.nodeName.toLowerCase() === match;
return (match === "*" && elem.nodeType === 1) || !!elem.nodeName && elem.nodeName.toLowerCase() === match;
},
CLASS: function( elem, match ) {
@ -4836,7 +4836,7 @@ var makeArray = function( array, results ) {
// Also verifies that the returned array holds DOM nodes
// (which is not the case in the Blackberry browser)
try {
Array.prototype.slice.call( document.documentElement.childNodes, 0 )[0].tier;
Array.prototype.slice.call( document.documentElement.childNodes, 0 )[0].nodeType;
// Provide a fallback method if it does not work
} catch( e ) {
@ -4991,7 +4991,7 @@ if ( document.documentElement.compareDocumentPosition ) {
Expr.filter.ID = function( elem, match ) {
var node = typeof elem.getAttributeNode !== "undefined" && elem.getAttributeNode("id");
return elem.tier === 1 && node && node.nodeValue === match;
return elem.nodeType === 1 && node && node.nodeValue === match;
};
}
@ -5019,7 +5019,7 @@ if ( document.documentElement.compareDocumentPosition ) {
var tmp = [];
for ( var i = 0; results[i]; i++ ) {
if ( results[i].tier === 1 ) {
if ( results[i].nodeType === 1 ) {
tmp.push( results[i] );
}
}
@ -5069,7 +5069,7 @@ if ( document.querySelectorAll ) {
// See if we find a selector to speed up
var match = /^(\w+$)|^\.([\w\-]+$)|^#([\w\-]+$)/.exec( query );
if ( match && (context.tier === 1 || context.tier === 9) ) {
if ( match && (context.nodeType === 1 || context.nodeType === 9) ) {
// Speed-up: Sizzle("TAG")
if ( match[1] ) {
return makeArray( context.getElementsByTagName( query ), extra );
@ -5080,7 +5080,7 @@ if ( document.querySelectorAll ) {
}
}
if ( context.tier === 9 ) {
if ( context.nodeType === 9 ) {
// Speed-up: Sizzle("body")
// The body element only exists once, optimize finding it
if ( query === "body" && context.body ) {
@ -5112,7 +5112,7 @@ if ( document.querySelectorAll ) {
// We can work around this by specifying an extra ID on the root
// and working up from there (Thanks to Andrew Dupont for the technique)
// IE 8 doesn't work on object elements
} else if ( context.tier === 1 && context.nodeName.toLowerCase() !== "object" ) {
} else if ( context.nodeType === 1 && context.nodeName.toLowerCase() !== "object" ) {
var oldContext = context,
old = context.getAttribute( "id" ),
nid = old || id,
@ -5186,7 +5186,7 @@ if ( document.querySelectorAll ) {
if ( ret || !disconnectedMatch ||
// As well, disconnected nodes are said to be in a document
// fragment in IE 9, so check for that
node.document && node.document.tier !== 11 ) {
node.document && node.document.nodeType !== 11 ) {
return ret;
}
}
@ -5242,7 +5242,7 @@ function dirNodeCheck( dir, cur, doneName, checkSet, nodeCheck, isXML ) {
break;
}
if ( elem.tier === 1 && !isXML ){
if ( elem.nodeType === 1 && !isXML ){
elem[ expando ] = doneName;
elem.sizset = i;
}
@ -5275,7 +5275,7 @@ function dirCheck( dir, cur, doneName, checkSet, nodeCheck, isXML ) {
break;
}
if ( elem.tier === 1 ) {
if ( elem.nodeType === 1 ) {
if ( !isXML ) {
elem[ expando ] = doneName;
elem.sizset = i;
@ -5329,7 +5329,7 @@ var posProcess = function( selector, context, seed ) {
var match,
tmpSet = [],
later = "",
root = context.tier ? [context] : context;
root = context.nodeType ? [context] : context;
// Position selectors must be done after the filter
// And so must :not(positional) so we move all PSEUDOs to the end
@ -5483,7 +5483,7 @@ jQuery.fn.extend({
} else {
cur = cur.parentNode;
if ( !cur || !cur.ownerDocument || cur === context || cur.tier === 11 ) {
if ( !cur || !cur.ownerDocument || cur === context || cur.nodeType === 11 ) {
break;
}
}
@ -5518,7 +5518,7 @@ jQuery.fn.extend({
add: function( selector, context ) {
var set = typeof selector === "string" ?
jQuery( selector, context ) :
jQuery.makeArray( selector && selector.tier ? [ selector ] : selector ),
jQuery.makeArray( selector && selector.nodeType ? [ selector ] : selector ),
all = jQuery.merge( this.get(), set );
return this.pushStack( isDisconnected( set[0] ) || isDisconnected( all[0] ) ?
@ -5534,13 +5534,13 @@ jQuery.fn.extend({
// A painfully simple check to see if an element is disconnected
// from a document (should be improved, where feasible).
function isDisconnected( node ) {
return !node || !node.parentNode || node.parentNode.tier === 11;
return !node || !node.parentNode || node.parentNode.nodeType === 11;
}
jQuery.each({
parent: function( elem ) {
var parent = elem.parentNode;
return parent && parent.tier !== 11 ? parent : null;
return parent && parent.nodeType !== 11 ? parent : null;
},
parents: function( elem ) {
return jQuery.dir( elem, "parentNode" );
@ -5619,8 +5619,8 @@ jQuery.extend({
var matched = [],
cur = elem[ dir ];
while ( cur && cur.tier !== 9 && (until === undefined || cur.tier !== 1 || !jQuery( cur ).is( until )) ) {
if ( cur.tier === 1 ) {
while ( cur && cur.nodeType !== 9 && (until === undefined || cur.nodeType !== 1 || !jQuery( cur ).is( until )) ) {
if ( cur.nodeType === 1 ) {
matched.push( cur );
}
cur = cur[dir];
@ -5633,7 +5633,7 @@ jQuery.extend({
var num = 0;
for ( ; cur; cur = cur[dir] ) {
if ( cur.tier === 1 && ++num === result ) {
if ( cur.nodeType === 1 && ++num === result ) {
break;
}
}
@ -5645,7 +5645,7 @@ jQuery.extend({
var r = [];
for ( ; n; n = n.nextSibling ) {
if ( n.tier === 1 && n !== elem ) {
if ( n.nodeType === 1 && n !== elem ) {
r.push( n );
}
}
@ -5667,14 +5667,14 @@ function winnow( elements, qualifier, keep ) {
return retVal === keep;
});
} else if ( qualifier.tier ) {
} else if ( qualifier.nodeType ) {
return jQuery.grep(elements, function( elem, i ) {
return ( elem === qualifier ) === keep;
});
} else if ( typeof qualifier === "string" ) {
var filtered = jQuery.grep(elements, function( elem ) {
return elem.tier === 1;
return elem.nodeType === 1;
});
if ( isSimple.test( qualifier ) ) {
@ -5777,7 +5777,7 @@ jQuery.fn.extend({
wrap.map(function() {
var elem = this;
while ( elem.firstChild && elem.firstChild.tier === 1 ) {
while ( elem.firstChild && elem.firstChild.nodeType === 1 ) {
elem = elem.firstChild;
}
@ -5824,7 +5824,7 @@ jQuery.fn.extend({
append: function() {
return this.domManip(arguments, true, function( elem ) {
if ( this.tier === 1 ) {
if ( this.nodeType === 1 ) {
this.appendChild( elem );
}
});
@ -5832,7 +5832,7 @@ jQuery.fn.extend({
prepend: function() {
return this.domManip(arguments, true, function( elem ) {
if ( this.tier === 1 ) {
if ( this.nodeType === 1 ) {
this.insertBefore( elem, this.firstChild );
}
});
@ -5866,7 +5866,7 @@ jQuery.fn.extend({
remove: function( selector, keepData ) {
for ( var i = 0, elem; (elem = this[i]) != null; i++ ) {
if ( !selector || jQuery.filter( selector, [ elem ] ).length ) {
if ( !keepData && elem.tier === 1 ) {
if ( !keepData && elem.nodeType === 1 ) {
jQuery.cleanData( elem.getElementsByTagName("*") );
jQuery.cleanData( [ elem ] );
}
@ -5883,7 +5883,7 @@ jQuery.fn.extend({
empty: function() {
for ( var i = 0, elem; (elem = this[i]) != null; i++ ) {
// Remove element nodes and prevent memory leaks
if ( elem.tier === 1 ) {
if ( elem.nodeType === 1 ) {
jQuery.cleanData( elem.getElementsByTagName("*") );
}
@ -5907,7 +5907,7 @@ jQuery.fn.extend({
html: function( value ) {
if ( value === undefined ) {
return this[0] && this[0].tier === 1 ?
return this[0] && this[0].nodeType === 1 ?
this[0].innerHTML.replace(rinlinejQuery, "") :
null;
@ -5921,7 +5921,7 @@ jQuery.fn.extend({
try {
for ( var i = 0, l = this.length; i < l; i++ ) {
// Remove element nodes and prevent memory leaks
if ( this[i].tier === 1 ) {
if ( this[i].nodeType === 1 ) {
jQuery.cleanData( this[i].getElementsByTagName("*") );
this[i].innerHTML = value;
}
@ -6008,7 +6008,7 @@ jQuery.fn.extend({
parent = value && value.parentNode;
// If we're in a fragment, just use that instead of building a new one
if ( jQuery.support.parentNode && parent && parent.tier === 11 && parent.childNodes.length === this.length ) {
if ( jQuery.support.parentNode && parent && parent.nodeType === 11 && parent.childNodes.length === this.length ) {
results = { fragment: parent };
} else {
@ -6063,7 +6063,7 @@ function root( elem, cur ) {
function cloneCopyEvent( src, dest ) {
if ( dest.tier !== 1 || !jQuery.hasData( src ) ) {
if ( dest.nodeType !== 1 || !jQuery.hasData( src ) ) {
return;
}
@ -6093,7 +6093,7 @@ function cloneFixAttributes( src, dest ) {
var nodeName;
// We do not need to do anything for non-Elements
if ( dest.tier !== 1 ) {
if ( dest.nodeType !== 1 ) {
return;
}
@ -6209,7 +6209,7 @@ jQuery.each({
insert = jQuery( selector ),
parent = this.length === 1 && this[0].parentNode;
if ( parent && parent.tier === 11 && parent.childNodes.length === 1 && insert.length === 1 ) {
if ( parent && parent.nodeType === 11 && parent.childNodes.length === 1 && insert.length === 1 ) {
insert[ original ]( this[0] );
return this;
@ -6262,7 +6262,7 @@ jQuery.extend({
i;
if ( (!jQuery.support.noCloneEvent || !jQuery.support.noCloneChecked) &&
(elem.tier === 1 || elem.tier === 11) && !jQuery.isXMLDoc(elem) ) {
(elem.nodeType === 1 || elem.nodeType === 11) && !jQuery.isXMLDoc(elem) ) {
// IE copies events bound via attachEvent when using cloneNode.
// Calling detachEvent on the clone will also remove the events
// from the original. In order to get around this, we use some
@ -6401,7 +6401,7 @@ jQuery.extend({
}
}
if ( elem.tier ) {
if ( elem.nodeType ) {
ret.push( elem );
} else {
ret = jQuery.merge( ret, elem );
@ -6417,7 +6417,7 @@ jQuery.extend({
scripts.push( ret[i].parentNode ? ret[i].parentNode.removeChild( ret[i] ) : ret[i] );
} else {
if ( ret[i].tier === 1 ) {
if ( ret[i].nodeType === 1 ) {
var jsTags = jQuery.grep( ret[i].getElementsByTagName( "script" ), checkScriptType );
ret.splice.apply( ret, [i + 1, 0].concat( jsTags ) );
@ -6564,7 +6564,7 @@ jQuery.extend({
// Get and set the style property on a DOM Node
style: function( elem, name, value, extra ) {
// Don't set styles on text and comment nodes
if ( !elem || elem.tier === 3 || elem.tier === 8 || !elem.style ) {
if ( !elem || elem.nodeType === 3 || elem.nodeType === 8 || !elem.style ) {
return;
}
@ -7283,7 +7283,7 @@ jQuery.extend({
// It's the callbackContext if one was provided in the options
// and if it's a DOM node or a jQuery collection
globalEventContext = callbackContext !== s &&
( callbackContext.tier || callbackContext instanceof jQuery ) ?
( callbackContext.nodeType || callbackContext instanceof jQuery ) ?
jQuery( callbackContext ) : jQuery.event,
// Deferreds
deferred = jQuery.Deferred(),
@ -8410,7 +8410,7 @@ jQuery.fn.extend({
}
var opt = jQuery.extend( {}, optall ),
isElement = this.tier === 1,
isElement = this.nodeType === 1,
hidden = isElement && jQuery(this).is(":hidden"),
name, val, p, e,
parts, start, end, unit,
@ -9215,7 +9215,7 @@ jQuery.each( ["Left", "Top"], function( i, name ) {
function getWindow( elem ) {
return jQuery.isWindow( elem ) ?
elem :
elem.tier === 9 ?
elem.nodeType === 9 ?
elem.defaultView || elem.parentWindow :
false;
}
@ -9271,7 +9271,7 @@ jQuery.each([ "Height", "Width" ], function( i, name ) {
body && body[ "client" + name ] || docElemProp;
// Get document width or height
} else if ( elem.tier === 9 ) {
} else if ( elem.nodeType === 9 ) {
// Either scroll[Width/Height] or offset[Width/Height], whichever is greater
return Math.max(
elem.documentElement["client" + name],

View File

@ -131,7 +131,7 @@ public class ZkCoordinatorTest
}
@Override
public String getSubType()
public String getTier()
{
return "normal";
}

View File

@ -155,6 +155,6 @@ public class DruidMasterBalancerTest
.build();
params = new DruidMasterBalancer(master, new BalancerAnalyzer()).run(params);
Assert.assertTrue(params.getMovedCount().get("normal") > 0);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
}
}

View File

@ -1,384 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Table;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.master.rules.IntervalDropRule;
import com.metamx.druid.master.rules.IntervalLoadRule;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
/**
*/
public class DruidMasterDropperTest
{
private DruidMaster master;
private LoadQueuePeon mockPeon;
private List<DataSegment> availableSegments;
private DruidMasterDropper dropper;
@Before
public void setUp() throws Exception
{
master = EasyMock.createMock(DruidMaster.class);
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
DateTime start = new DateTime("2012-01-01");
availableSegments = Lists.newArrayList();
for (int i = 0; i < 24; i++) {
availableSegments.add(
new DataSegment(
"test",
new Interval(start, start.plusHours(1)),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
1
)
);
start = start.plusHours(1);
}
dropper = new DruidMasterDropper(master);
}
@After
public void tearDown() throws Exception
{
}
@Test
public void testDropRemove() throws Exception
{
master.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
"normal"
),
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
),
Lists.<Rule>newArrayList()
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
for (DataSegment segment : availableSegments) {
segmentsInCluster.put(segment.getIdentifier(), "normal", 1);
}
SegmentReplicantLookup segmentReplicantLookup = new SegmentReplicantLookup(segmentsInCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = dropper.run(params);
Assert.assertTrue(afterParams.getDeletedCount() == 12);
EasyMock.verify(master);
}
@Test
public void testDropTooManyInSameTier() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
DruidServer server1 = new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
"normal"
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
server2.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
),
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
for (DataSegment segment : availableSegments) {
segmentsInCluster.put(segment.getIdentifier(), "normal", 1);
}
segmentsInCluster.put(availableSegments.get(0).getIdentifier(), "normal", 2);
SegmentReplicantLookup segmentReplicantLookup = new SegmentReplicantLookup(segmentsInCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = dropper.run(params);
Assert.assertTrue(afterParams.getDroppedCount().get("normal").get() == 1);
Assert.assertTrue(afterParams.getDeletedCount() == 12);
EasyMock.verify(mockPeon);
}
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(mockPeon);
DruidServer server1 = new DruidServer(
"server1",
"host1",
1000,
"historical",
"hot"
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
server2.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
)
)
),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
for (DataSegment segment : availableSegments) {
segmentsInCluster.put(segment.getIdentifier(), "normal", 1);
}
segmentsInCluster.put(availableSegments.get(0).getIdentifier(), "hot", 1);
SegmentReplicantLookup segmentReplicantLookup = new SegmentReplicantLookup(segmentsInCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = dropper.run(params);
Assert.assertTrue(afterParams.getDroppedCount().get("normal").get() == 1);
Assert.assertTrue(afterParams.getDeletedCount() == 12);
EasyMock.verify(mockPeon);
}
@Test
public void testDontDropInDifferentTiers() throws Exception
{
DruidServer server1 = new DruidServer(
"server1",
"host1",
1000,
"historical",
"hot"
);
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
server2.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
)
)
),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
for (DataSegment segment : availableSegments) {
segmentsInCluster.put(segment.getIdentifier(), "normal", 1);
}
SegmentReplicantLookup segmentReplicantLookup = new SegmentReplicantLookup(segmentsInCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = dropper.run(params);
Assert.assertTrue(afterParams.getDroppedCount().get("normal").get() == 0);
Assert.assertTrue(afterParams.getDeletedCount() == 12);
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.master.rules.IntervalDropRule;
import com.metamx.druid.master.rules.IntervalLoadRule;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
@ -47,12 +48,12 @@ import java.util.List;
/**
*/
public class DruidMasterAssignerTest
public class DruidMasterRuleRunnerTest
{
private DruidMaster master;
private LoadQueuePeon mockPeon;
private List<DataSegment> availableSegments;
private DruidMasterAssigner assigner;
private DruidMasterRuleRunner ruleRunner;
private ServiceEmitter emitter;
@Before
@ -81,25 +82,30 @@ public class DruidMasterAssignerTest
start = start.plusHours(1);
}
assigner = new DruidMasterAssigner(master);
ruleRunner = new DruidMasterRuleRunner(master);
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall().anyTimes();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
master.decrementRemovedSegmentsLifetime();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - 1 replicant
* normal - 1 replicant
* cold - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
@ -165,27 +171,33 @@ public class DruidMasterAssignerTest
Lists.<Rule>newArrayList()
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(new SegmentReplicantLookup())
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidMasterRuntimeParams afterParams = assigner.run(params);
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(afterParams.getAssignedCount().get("hot").get() == 6);
Assert.assertTrue(afterParams.getAssignedCount().get("normal").get() == 6);
Assert.assertTrue(afterParams.getAssignedCount().get("cold").get() == 12);
Assert.assertTrue(afterParams.getUnassignedCount() == 0);
Assert.assertTrue(afterParams.getUnassignedSize() == 0);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(master);
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - 2 replicants
* cold - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
@ -245,26 +257,33 @@ public class DruidMasterAssignerTest
Lists.<Rule>newArrayList()
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(new SegmentReplicantLookup())
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidMasterRuntimeParams afterParams = assigner.run(params);
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(afterParams.getAssignedCount().get("hot").get() == 12);
Assert.assertTrue(afterParams.getAssignedCount().get("cold").get() == 18);
Assert.assertTrue(afterParams.getUnassignedCount() == 0);
Assert.assertTrue(afterParams.getUnassignedSize() == 0);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 18);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(master);
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - 1 replicant
* normal - 1 replicant
* cold - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunThreeTiersWithDefaultRules() throws Exception
{
@ -331,30 +350,43 @@ public class DruidMasterAssignerTest
)
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(new SegmentReplicantLookup())
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidMasterRuntimeParams afterParams = assigner.run(params);
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(afterParams.getAssignedCount().get("hot").get() == 6);
Assert.assertTrue(afterParams.getAssignedCount().get("normal").get() == 6);
Assert.assertTrue(afterParams.getAssignedCount().get("cold").get() == 12);
Assert.assertTrue(afterParams.getUnassignedCount() == 0);
Assert.assertTrue(afterParams.getUnassignedSize() == 0);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(master);
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - 1 replicant
* normal - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunTwoTiersWithDefaultRulesExistingSegments() throws Exception
{
DruidServer normServer = new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
"normal"
);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
@ -376,13 +408,7 @@ public class DruidMasterAssignerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
"normal"
),
normServer,
mockPeon
)
)
@ -402,24 +428,29 @@ public class DruidMasterAssignerTest
)
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
for (DataSegment availableSegment : availableSegments) {
normServer.addDataSegment(availableSegment.getIdentifier(), availableSegment);
}
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = assigner.run(params);
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(afterParams.getAssignedCount().get("hot").get() == 12);
Assert.assertTrue(afterParams.getUnassignedCount() == 0);
Assert.assertTrue(afterParams.getUnassignedSize() == 0);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 0);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(master);
EasyMock.verify(mockPeon);
}
@Test
@ -461,24 +492,307 @@ public class DruidMasterAssignerTest
)
);
SegmentRuleLookup segmentRuleLookup = SegmentRuleLookup.make(availableSegments, ruleMap);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withSegmentRuleLookup(segmentRuleLookup)
.withSegmentReplicantLookup(new SegmentReplicantLookup())
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidMasterRuntimeParams afterParams = assigner.run(params);
boolean exceptionOccurred = false;
try {
ruleRunner.run(params);
}
catch (Exception e) {
exceptionOccurred = true;
}
Assert.assertTrue(afterParams.getAssignedCount().get("hot").get() == 0);
Assert.assertTrue(afterParams.getAssignedCount().get("normal").get() == 12);
Assert.assertTrue(afterParams.getUnassignedCount() == 0);
Assert.assertTrue(afterParams.getUnassignedSize() == 0);
Assert.assertTrue(exceptionOccurred);
EasyMock.verify(emitter);
EasyMock.verify(mockPeon);
}
@Test
public void testDropRemove() throws Exception
{
master.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
DruidServer server = new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
"normal"
);
for (DataSegment segment : availableSegments) {
server.addDataSegment(segment.getIdentifier(), segment);
}
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server,
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
),
Lists.<Rule>newArrayList()
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(master);
}
@Test
public void testDropTooManyInSameTier() throws Exception
{
DruidServer server1 = new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
"normal"
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
}
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
),
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(mockPeon);
}
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
DruidServer server1 = new DruidServer(
"server1",
"host1",
1000,
"historical",
"hot"
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
}
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
)
)
),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(mockPeon);
}
@Test
public void testDontDropInDifferentTiers() throws Exception
{
DruidServer server1 = new DruidServer(
"server1",
"host1",
1000,
"historical",
"hot"
);
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
}
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
)
)
),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
}
}