caching for rules; get rules from cache

This commit is contained in:
Fangjin Yang 2012-12-03 16:13:20 -08:00
parent 5822f4f5f7
commit 5b8bd412c0
11 changed files with 368 additions and 448 deletions

View File

@ -1,196 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.db;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*/
public class DatabaseRuleCoordinator
{
private static final Logger log = new Logger(DatabaseRuleCoordinator.class);
private final ObjectMapper jsonMapper;
private final DatabaseRuleCoordinatorConfig config;
private final DBI dbi;
private final Object lock = new Object();
public DatabaseRuleCoordinator(ObjectMapper jsonMapper, DatabaseRuleCoordinatorConfig config, DBI dbi)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.dbi = dbi;
}
public RuleMap getRuleMap()
{
Map<String, List<Rule>> assignmentRules = getAllRules();
return new RuleMap(
assignmentRules,
assignmentRules.get(config.getDefaultDatasource())
);
}
public Map<String, List<Rule>> getAllRules()
{
synchronized (lock) {
return dbi.withHandle(
new HandleCallback<Map<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT dataSource, payload FROM %s", config.getRuleTable())
).fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Map<String, Object>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
String dataSource = MapUtils.getString(stringObjectMap, "dataSource");
List<Rule> rules = jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "payload"), new TypeReference<List<Rule>>()
{
}
);
retVal.put(dataSource, rules);
return retVal;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
}
}
public List<Rule> getRules(final String dataSource)
{
synchronized (lock) {
return dbi.withHandle(
new HandleCallback<List<Rule>>()
{
@Override
public List<Rule> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", config.getRuleTable())
)
.bind("dataSource", dataSource)
.fold(
Lists.<Rule>newArrayList(),
new Folder3<ArrayList<Rule>, Map<String, Object>>()
{
@Override
public ArrayList<Rule> fold(
ArrayList<Rule> rules,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
return jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "payload"), new TypeReference<List<Rule>>()
{
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
}
}
public boolean overrideRule(final String dataSource, final List<Rule> rules)
{
synchronized (lock) {
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, ruleVersion, payload) VALUES (:id, :dataSource, :ruleVersion, :payload)",
config.getRuleTable()
)
)
.bind("id", String.format("%s_%s", dataSource, config.getRuleVersion()))
.bind("dataSource", dataSource)
.bind("ruleVersion", config.getRuleVersion())
.bind("payload", jsonMapper.writeValueAsString(rules))
.execute();
return null;
}
}
);
}
catch (Exception e) {
log.error(e, String.format("Exception while overriding rule for %s", dataSource));
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,240 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.db;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.master.rules.Rule;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.Duration;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class DatabaseRuleManager
{
private static final Logger log = new Logger(DatabaseRuleManager.class);
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService exec;
private final DatabaseRuleManagerConfig config;
private final DBI dbi;
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
private final Object lock = new Object();
private volatile boolean started = false;
public DatabaseRuleManager(
ObjectMapper jsonMapper,
ScheduledExecutorService exec,
DatabaseRuleManagerConfig config,
DBI dbi
)
{
this.jsonMapper = jsonMapper;
this.exec = exec;
this.config = config;
this.dbi = dbi;
this.rules = new AtomicReference<ConcurrentHashMap<String, List<Rule>>>(
new ConcurrentHashMap<String, List<Rule>>()
);
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.getRulesPollDuration(),
new Runnable()
{
@Override
public void run()
{
poll();
}
}
);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
rules.set(new ConcurrentHashMap<String, List<Rule>>());
started = false;
}
}
public void poll()
{
try {
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<String, List<Rule>>(
dbi.withHandle(
new HandleCallback<Map<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT dataSource, payload FROM %s", config.getRuleTable())
).fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Map<String, Object>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
String dataSource = MapUtils.getString(stringObjectMap, "dataSource");
List<Rule> rules = jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "payload"), new TypeReference<List<Rule>>()
{
}
);
retVal.put(dataSource, rules);
return retVal;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
)
);
log.info("Polled and found rules for %,d datasource(s)", newRules.size());
rules.set(newRules);
}
catch (Exception e) {
log.error(e, "Exception while polling for rules");
}
}
public Map<String, List<Rule>> getAllRules()
{
return rules.get();
}
public List<Rule> getRules(final String dataSource)
{
return rules.get().get(dataSource);
}
public List<Rule> getRulesWithDefault(final String dataSource)
{
List<Rule> retVal = Lists.newArrayList();
Map<String, List<Rule>> theRules = rules.get();
if (theRules.get(dataSource) != null) {
retVal.addAll(theRules.get(dataSource));
}
if (theRules.get(config.getDefaultDatasource()) != null) {
retVal.addAll(theRules.get(config.getDefaultDatasource()));
}
return retVal;
}
public boolean overrideRule(final String dataSource, final List<Rule> newRules)
{
synchronized (lock) {
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, ruleVersion, payload) VALUES (:id, :dataSource, :ruleVersion, :payload)",
config.getRuleTable()
)
)
.bind("id", String.format("%s_%s", dataSource, config.getRuleVersion()))
.bind("dataSource", dataSource)
.bind("ruleVersion", config.getRuleVersion())
.bind("payload", jsonMapper.writeValueAsString(newRules))
.execute();
return null;
}
}
);
ConcurrentHashMap<String, List<Rule>> existingRules = rules.get();
if (existingRules == null) {
existingRules = new ConcurrentHashMap<String, List<Rule>>();
}
existingRules.put(dataSource, newRules);
}
catch (Exception e) {
log.error(e, String.format("Exception while overriding rule for %s", dataSource));
return false;
}
}
return true;
}
}

View File

@ -19,11 +19,13 @@
package com.metamx.druid.db;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class DatabaseRuleCoordinatorConfig
public abstract class DatabaseRuleManagerConfig
{
@Config("druid.database.ruleTable")
public abstract String getRuleTable();
@ -33,4 +35,8 @@ public abstract class DatabaseRuleCoordinatorConfig
@Config("druid.database.ruleVersion")
public abstract String getRuleVersion();
@Config("druid.database.rules.poll.duration")
@Default("PT1M")
public abstract Duration getRulesPollDuration();
}

View File

@ -31,7 +31,7 @@ import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.db.DatabaseRuleCoordinator;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.master.rules.Rule;
@ -59,20 +59,20 @@ public class InfoResource
{
private final ServerInventoryManager serverInventoryManager;
private final DatabaseSegmentManager databaseSegmentManager;
private final DatabaseRuleCoordinator databaseRuleCoordinator;
private final DatabaseRuleManager databaseRuleManager;
private final DruidClusterInfo druidClusterInfo;
@Inject
public InfoResource(
ServerInventoryManager serverInventoryManager,
DatabaseSegmentManager databaseSegmentManager,
DatabaseRuleCoordinator databaseRuleCoordinator,
DatabaseRuleManager databaseRuleManager,
DruidClusterInfo druidClusterInfo
)
{
this.serverInventoryManager = serverInventoryManager;
this.databaseSegmentManager = databaseSegmentManager;
this.databaseRuleCoordinator = databaseRuleCoordinator;
this.databaseRuleManager = databaseRuleManager;
this.druidClusterInfo = druidClusterInfo;
}
@ -272,7 +272,7 @@ public class InfoResource
public Response getRules()
{
return Response.status(Response.Status.OK)
.entity(databaseRuleCoordinator.getAllRules())
.entity(databaseRuleManager.getAllRules())
.build();
}
@ -284,7 +284,7 @@ public class InfoResource
)
{
return Response.status(Response.Status.OK)
.entity(databaseRuleCoordinator.getRules(dataSourceName))
.entity(databaseRuleManager.getRules(dataSourceName))
.build();
}
@ -296,7 +296,7 @@ public class InfoResource
final List<Rule> rules
)
{
if (databaseRuleCoordinator.overrideRule(dataSourceName, rules)) {
if (databaseRuleManager.overrideRule(dataSourceName, rules)) {
return Response.status(Response.Status.OK).build();
}
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();

View File

@ -34,8 +34,8 @@ import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.client.ServerInventoryManagerConfig;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.coordination.DruidClusterInfoConfig;
import com.metamx.druid.db.DatabaseRuleCoordinator;
import com.metamx.druid.db.DatabaseRuleCoordinatorConfig;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DbConnector;
@ -125,9 +125,10 @@ public class MasterMain
configFactory.build(DatabaseSegmentManagerConfig.class),
dbi
);
final DatabaseRuleCoordinator databaseRuleCoordinator = new DatabaseRuleCoordinator(
final DatabaseRuleManager databaseRuleManager = new DatabaseRuleManager(
jsonMapper,
configFactory.build(DatabaseRuleCoordinatorConfig.class),
scheduledExecutorFactory.create(1, "DatabaseRuleManager-Exec--%d"),
configFactory.build(DatabaseRuleManagerConfig.class),
dbi
);
@ -177,7 +178,7 @@ public class MasterMain
jsonMapper,
databaseSegmentManager,
serverInventoryManager,
databaseRuleCoordinator,
databaseRuleManager,
masterYp,
emitter,
scheduledExecutorFactory,
@ -212,7 +213,7 @@ public class MasterMain
new MasterServletModule(
serverInventoryManager,
databaseSegmentManager,
databaseRuleCoordinator,
databaseRuleManager,
druidClusterInfo,
master,
jsonMapper

View File

@ -22,7 +22,7 @@ package com.metamx.druid.http;
import com.google.inject.Provides;
import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.db.DatabaseRuleCoordinator;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.master.DruidMaster;
import com.sun.jersey.guice.JerseyServletModule;
@ -38,7 +38,7 @@ public class MasterServletModule extends JerseyServletModule
{
private final ServerInventoryManager serverInventoryManager;
private final DatabaseSegmentManager segmentInventoryManager;
private final DatabaseRuleCoordinator databaseRuleCoordinator;
private final DatabaseRuleManager databaseRuleManager;
private final DruidClusterInfo druidClusterInfo;
private final DruidMaster master;
private final ObjectMapper jsonMapper;
@ -46,7 +46,7 @@ public class MasterServletModule extends JerseyServletModule
public MasterServletModule(
ServerInventoryManager serverInventoryManager,
DatabaseSegmentManager segmentInventoryManager,
DatabaseRuleCoordinator databaseRuleCoordinator,
DatabaseRuleManager databaseRuleManager,
DruidClusterInfo druidClusterInfo,
DruidMaster master,
ObjectMapper jsonMapper
@ -54,7 +54,7 @@ public class MasterServletModule extends JerseyServletModule
{
this.serverInventoryManager = serverInventoryManager;
this.segmentInventoryManager = segmentInventoryManager;
this.databaseRuleCoordinator = databaseRuleCoordinator;
this.databaseRuleManager = databaseRuleManager;
this.druidClusterInfo = druidClusterInfo;
this.master = master;
this.jsonMapper = jsonMapper;
@ -67,7 +67,7 @@ public class MasterServletModule extends JerseyServletModule
bind(MasterResource.class);
bind(ServerInventoryManager.class).toInstance(serverInventoryManager);
bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager);
bind(DatabaseRuleCoordinator.class).toInstance(databaseRuleCoordinator);
bind(DatabaseRuleManager.class).toInstance(databaseRuleManager);
bind(DruidMaster.class).toInstance(master);
bind(DruidClusterInfo.class).toInstance(druidClusterInfo);

View File

@ -40,7 +40,7 @@ import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.db.DatabaseRuleCoordinator;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -78,7 +78,7 @@ public class DruidMaster
private final DruidClusterInfo clusterInfo;
private final DatabaseSegmentManager databaseSegmentManager;
private final ServerInventoryManager serverInventoryManager;
private final DatabaseRuleCoordinator databaseRuleCoordinator;
private final DatabaseRuleManager databaseRuleManager;
private final PhoneBook yp;
private final ServiceEmitter emitter;
private final ScheduledExecutorService exec;
@ -95,7 +95,7 @@ public class DruidMaster
ObjectMapper jsonMapper,
DatabaseSegmentManager databaseSegmentManager,
ServerInventoryManager serverInventoryManager,
DatabaseRuleCoordinator databaseRuleCoordinator,
DatabaseRuleManager databaseRuleManager,
PhoneBook zkPhoneBook,
ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory,
@ -110,7 +110,7 @@ public class DruidMaster
this.databaseSegmentManager = databaseSegmentManager;
this.serverInventoryManager = serverInventoryManager;
this.databaseRuleCoordinator = databaseRuleCoordinator;
this.databaseRuleManager = databaseRuleManager;
this.yp = zkPhoneBook;
this.emitter = emitter;
@ -422,6 +422,7 @@ public class DruidMaster
log.info("I am the master, all must bow!");
master = true;
databaseSegmentManager.start();
databaseRuleManager.start();
serverInventoryManager.start();
final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList();
@ -651,7 +652,7 @@ public class DruidMaster
return params.buildFromExisting()
.withDruidCluster(cluster)
.withRuleMap(databaseRuleCoordinator.getRuleMap())
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
}

View File

@ -117,16 +117,6 @@ public class DruidMasterCleanup implements DruidMasterHelper
}
}
//List<String> removedMsgs = Lists.newArrayList();
//for (Map.Entry<String, AtomicLong> entry : unneededSegments.entrySet()) {
// removedMsgs.add(
// String.format(
// "[%s] : Removed %s unneeded segments among %,d servers",
// entry.getKey(), entry.getValue(), cluster.get(entry.getKey()).size()
// )
// );
//}
return params.buildFromExisting()
.withMasterStats(stats)
.build();

View File

@ -22,6 +22,7 @@ package com.metamx.druid.master;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
@ -52,13 +53,9 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
}
// Run through all matched rules for available segments
RuleMap ruleMap = params.getRuleMap();
DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager();
for (DataSegment segment : params.getAvailableSegments()) {
List<Rule> rules = ruleMap.getRules(segment.getDataSource());
if (rules == null) {
throw new ISE("No rules found for segment[%s]", segment.getIdentifier());
}
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {

View File

@ -24,7 +24,7 @@ import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.db.DatabaseRuleCoordinator;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.RuleMap;
import com.metamx.emitter.service.ServiceEmitter;
@ -39,7 +39,7 @@ public class DruidMasterRuntimeParams
{
private final long startTime;
private final DruidCluster druidCluster;
private final RuleMap ruleMap;
private final DatabaseRuleManager databaseRuleManager;
private final SegmentReplicantLookup segmentReplicantLookup;
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
@ -53,7 +53,7 @@ public class DruidMasterRuntimeParams
public DruidMasterRuntimeParams(
long startTime,
DruidCluster druidCluster,
RuleMap ruleMap,
DatabaseRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
@ -67,7 +67,7 @@ public class DruidMasterRuntimeParams
{
this.startTime = startTime;
this.druidCluster = druidCluster;
this.ruleMap = ruleMap;
this.databaseRuleManager = databaseRuleManager;
this.segmentReplicantLookup = segmentReplicantLookup;
this.dataSources = dataSources;
this.availableSegments = availableSegments;
@ -89,9 +89,9 @@ public class DruidMasterRuntimeParams
return druidCluster;
}
public RuleMap getRuleMap()
public DatabaseRuleManager getDatabaseRuleManager()
{
return ruleMap;
return databaseRuleManager;
}
public SegmentReplicantLookup getSegmentReplicantLookup()
@ -154,7 +154,7 @@ public class DruidMasterRuntimeParams
return new Builder(
startTime,
druidCluster,
ruleMap,
databaseRuleManager,
segmentReplicantLookup,
dataSources,
availableSegments,
@ -171,7 +171,7 @@ public class DruidMasterRuntimeParams
{
private long startTime;
private DruidCluster druidCluster;
private RuleMap ruleMap;
private DatabaseRuleManager databaseRuleManager;
private SegmentReplicantLookup segmentReplicantLookup;
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
@ -186,7 +186,7 @@ public class DruidMasterRuntimeParams
{
this.startTime = 0;
this.druidCluster = null;
this.ruleMap = null;
this.databaseRuleManager = null;
this.segmentReplicantLookup = null;
this.dataSources = Sets.newHashSet();
this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
@ -201,7 +201,7 @@ public class DruidMasterRuntimeParams
Builder(
long startTime,
DruidCluster cluster,
RuleMap ruleMap,
DatabaseRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
@ -215,7 +215,7 @@ public class DruidMasterRuntimeParams
{
this.startTime = startTime;
this.druidCluster = cluster;
this.ruleMap = ruleMap;
this.databaseRuleManager = databaseRuleManager;
this.segmentReplicantLookup = segmentReplicantLookup;
this.dataSources = dataSources;
this.availableSegments = availableSegments;
@ -232,7 +232,7 @@ public class DruidMasterRuntimeParams
return new DruidMasterRuntimeParams(
startTime,
druidCluster,
ruleMap,
databaseRuleManager,
segmentReplicantLookup,
dataSources,
availableSegments,
@ -257,9 +257,9 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withRuleMap(RuleMap ruleMap)
public Builder withDatabaseRuleManager(DatabaseRuleManager databaseRuleManager)
{
this.ruleMap = ruleMap;
this.databaseRuleManager = databaseRuleManager;
return this;
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.IntervalDropRule;
import com.metamx.druid.master.rules.IntervalLoadRule;
import com.metamx.druid.master.rules.Rule;
@ -55,6 +56,7 @@ public class DruidMasterRuleRunnerTest
private List<DataSegment> availableSegments;
private DruidMasterRuleRunner ruleRunner;
private ServiceEmitter emitter;
private DatabaseRuleManager databaseRuleManager;
@Before
public void setUp()
@ -63,6 +65,7 @@ public class DruidMasterRuleRunnerTest
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(DatabaseRuleManager.class);
DateTime start = new DateTime("2012-01-01");
availableSegments = Lists.newArrayList();
@ -96,6 +99,7 @@ public class DruidMasterRuleRunnerTest
@After
public void tearDown() throws Exception
{
EasyMock.verify(databaseRuleManager);
}
/**
@ -109,6 +113,15 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
@ -159,23 +172,11 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
)
),
Lists.<Rule>newArrayList()
);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
@ -201,6 +202,14 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
@ -246,22 +255,11 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
)
),
Lists.<Rule>newArrayList()
);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
@ -280,106 +278,20 @@ public class DruidMasterRuleRunnerTest
* Nodes:
* hot - 1 replicant
* normal - 1 replicant
* cold - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunThreeTiersWithDefaultRules() throws Exception
public void testRunTwoTiersWithExistingSegments() throws Exception
{
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot"
),
mockPeon
)
)
),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
"normal"
),
mockPeon
)
)
),
"cold",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverCold",
"hostCold",
1000,
"historical",
"cold"
),
mockPeon
)
)
)
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal")
)
),
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
)
);
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - 1 replicant
* normal - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunTwoTiersWithDefaultRulesExistingSegments() throws Exception
{
DruidServer normServer = new DruidServer(
"serverNorm",
"hostNorm",
@ -387,6 +299,10 @@ public class DruidMasterRuleRunnerTest
"historical",
"normal"
);
for (DataSegment availableSegment : availableSegments) {
normServer.addDataSegment(availableSegment.getIdentifier(), availableSegment);
}
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
@ -416,29 +332,13 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
)
);
for (DataSegment availableSegment : availableSegments) {
normServer.addDataSegment(availableSegment.getIdentifier(), availableSegment);
}
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
@ -460,6 +360,14 @@ public class DruidMasterRuleRunnerTest
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(emitter);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
@ -480,24 +388,12 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
)
);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
@ -522,6 +418,14 @@ public class DruidMasterRuleRunnerTest
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server = new DruidServer(
"serverNorm",
"hostNorm",
@ -547,24 +451,13 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
),
Lists.<Rule>newArrayList()
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
@ -579,6 +472,14 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropTooManyInSameTier() throws Exception
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"serverNorm",
"hostNorm",
@ -617,25 +518,13 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
@ -651,6 +540,14 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"server1",
"host1",
@ -693,25 +590,13 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
@ -727,6 +612,14 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDontDropInDifferentTiers() throws Exception
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"server1",
"host1",
@ -767,25 +660,13 @@ public class DruidMasterRuleRunnerTest
)
);
RuleMap ruleMap = new RuleMap(
ImmutableMap.<String, List<Rule>>of(
"test",
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot")
)
),
Lists.<Rule>newArrayList(
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withRuleMap(ruleMap)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();