Merge pull request #553 from metamx/snapshot-coordinator

Snapshot coordinator state before running anything
This commit is contained in:
Gian Merlino 2014-05-19 18:24:19 -07:00
commit a7760af65a
23 changed files with 462 additions and 199 deletions

View File

@ -20,6 +20,8 @@
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.druid.timeline.DataSegment;
@ -117,4 +119,14 @@ public class DruidDataSource
", partitions=" + segmentsHolder.toString() +
'}';
}
public ImmutableDruidDataSource toImmutableDruidDataSource()
{
return new ImmutableDruidDataSource(
name,
ImmutableMap.copyOf(properties),
ImmutableMap.copyOf(partitionNames),
ImmutableSet.copyOf(segmentsHolder)
);
}
}

View File

@ -21,7 +21,9 @@ package io.druid.client;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
@ -273,4 +275,26 @@ public class DruidServer implements Comparable
return getName().compareTo(((DruidServer) o).getName());
}
public ImmutableDruidServer toImmutableDruidServer()
{
return new ImmutableDruidServer(
metadata,
currSize,
ImmutableMap.copyOf(
Maps.transformValues(
dataSources,
new Function<DruidDataSource, ImmutableDruidDataSource>()
{
@Override
public ImmutableDruidDataSource apply(DruidDataSource input)
{
return input.toImmutableDruidDataSource();
}
}
)
),
ImmutableMap.copyOf(segments)
);
}
}

View File

@ -0,0 +1,75 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Set;
/**
*/
public class ImmutableDruidDataSource
{
private final String name;
private final ImmutableMap<String, String> properties;
private final ImmutableMap<String, DataSegment> partitionNames;
private final ImmutableSet<DataSegment> segmentsHolder;
public ImmutableDruidDataSource(
String name,
ImmutableMap<String, String> properties,
ImmutableMap<String, DataSegment> partitionNames,
ImmutableSet<DataSegment> segmentsHolder
)
{
this.name = name;
this.properties = properties;
this.partitionNames = partitionNames;
this.segmentsHolder = segmentsHolder;
}
public String getName()
{
return name;
}
public Map<String, String> getProperties()
{
return properties;
}
public Map<String, DataSegment> getPartitionNames()
{
return partitionNames;
}
public boolean isEmpty()
{
return segmentsHolder.isEmpty();
}
public Set<DataSegment> getSegments()
{
return segmentsHolder;
}
}

View File

@ -0,0 +1,104 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.google.common.collect.ImmutableMap;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import java.util.Map;
/**
*/
public class ImmutableDruidServer
{
private final DruidServerMetadata metadata;
private final long currSize;
private final ImmutableMap<String, ImmutableDruidDataSource> dataSources;
private final ImmutableMap<String, DataSegment> segments;
public ImmutableDruidServer(
DruidServerMetadata metadata,
long currSize,
ImmutableMap<String, ImmutableDruidDataSource> dataSources,
ImmutableMap<String, DataSegment> segments
)
{
this.metadata = metadata;
this.currSize = currSize;
this.segments = segments;
this.dataSources = dataSources;
}
public String getName()
{
return metadata.getName();
}
public DruidServerMetadata getMetadata()
{
return metadata;
}
public String getHost()
{
return metadata.getHost();
}
public long getCurrSize()
{
return currSize;
}
public long getMaxSize()
{
return metadata.getMaxSize();
}
public String getType()
{
return metadata.getType();
}
public String getTier()
{
return metadata.getTier();
}
public int getPriority()
{
return metadata.getPriority();
}
public DataSegment getSegment(String segmentName)
{
return segments.get(segmentName);
}
public Iterable<ImmutableDruidDataSource> getDataSources()
{
return dataSources.values();
}
public Map<String, DataSegment> getSegments()
{
return segments;
}
}

View File

@ -51,7 +51,6 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@ -125,7 +124,7 @@ public class DatabaseRuleManager
private final Supplier<DatabaseRuleManagerConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final IDBI dbi;
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
private final AtomicReference<ImmutableMap<String, List<Rule>>> rules;
private volatile ScheduledExecutorService exec;
@ -146,8 +145,8 @@ public class DatabaseRuleManager
this.dbTables = dbTables;
this.dbi = dbi;
this.rules = new AtomicReference<ConcurrentHashMap<String, List<Rule>>>(
new ConcurrentHashMap<String, List<Rule>>()
this.rules = new AtomicReference<>(
ImmutableMap.<String, List<Rule>>of()
);
}
@ -188,7 +187,7 @@ public class DatabaseRuleManager
return;
}
rules.set(new ConcurrentHashMap<String, List<Rule>>());
rules.set(ImmutableMap.<String, List<Rule>>of());
started = false;
exec.shutdownNow();
@ -199,7 +198,7 @@ public class DatabaseRuleManager
public void poll()
{
try {
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<String, List<Rule>>(
ImmutableMap<String, List<Rule>> newRules = ImmutableMap.copyOf(
dbi.withHandle(
new HandleCallback<Map<String, List<Rule>>>()
{
@ -309,12 +308,6 @@ public class DatabaseRuleManager
}
}
);
ConcurrentHashMap<String, List<Rule>> existingRules = rules.get();
if (existingRules == null) {
existingRules = new ConcurrentHashMap<String, List<Rule>>();
}
existingRules.put(dataSource, newRules);
}
catch (Exception e) {
log.error(e, String.format("Exception while overriding rule for %s", dataSource));

View File

@ -87,6 +87,52 @@ public class DruidServerMetadata
return priority;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DruidServerMetadata metadata = (DruidServerMetadata) o;
if (maxSize != metadata.maxSize) {
return false;
}
if (priority != metadata.priority) {
return false;
}
if (host != null ? !host.equals(metadata.host) : metadata.host != null) {
return false;
}
if (name != null ? !name.equals(metadata.name) : metadata.name != null) {
return false;
}
if (tier != null ? !tier.equals(metadata.tier) : metadata.tier != null) {
return false;
}
if (type != null ? !type.equals(metadata.type) : metadata.type != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (host != null ? host.hashCode() : 0);
result = 31 * result + (int) (maxSize ^ (maxSize >>> 32));
result = 31 * result + (tier != null ? tier.hashCode() : 0);
result = 31 * result + (type != null ? type.hashCode() : 0);
result = 31 * result + priority;
return result;
}
@Override
public String toString()
{

View File

@ -19,21 +19,21 @@
package io.druid.server.coordinator;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
/**
*/
public class BalancerSegmentHolder
{
private final DruidServer fromServer;
private final ImmutableDruidServer fromServer;
private final DataSegment segment;
// This is a pretty fugly hard coding of the maximum lifetime
private volatile int lifetime = 15;
public BalancerSegmentHolder(
DruidServer fromServer,
ImmutableDruidServer fromServer,
DataSegment segment
)
{
@ -41,7 +41,7 @@ public class BalancerSegmentHolder
this.segment = segment;
}
public DruidServer getFromServer()
public ImmutableDruidServer getFromServer()
{
return fromServer;
}

View File

@ -22,7 +22,7 @@ package io.druid.server.coordinator;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import java.util.Map;
@ -46,7 +46,7 @@ public class DruidCluster
public void add(ServerHolder serverHolder)
{
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
MinMaxPriorityQueue<ServerHolder> tierServers = cluster.get(server.getTier());
if (tierServers == null) {
tierServers = MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create();

View File

@ -31,12 +31,10 @@ import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
@ -44,6 +42,8 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.collections.CountingMap;
@ -342,52 +342,53 @@ public class DruidCoordinator
}
}
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
public void moveSegment(
ImmutableDruidServer fromServer,
ImmutableDruidServer toServer,
String segmentName,
final LoadPeonCallback callback
)
{
try {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IAE("Unable to find server [%s]", from);
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) {
throw new IAE("Unable to find server [%s]", to);
}
if (to.equalsIgnoreCase(from)) {
throw new IAE("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to);
if (fromServer.getMetadata().equals(toServer.getMetadata())) {
throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentName, fromServer.getName());
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, fromServer.getName());
}
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
final LoadQueuePeon loadPeon = loadManagementPeons.get(toServer.getName());
if (loadPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", to);
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", toServer.getName());
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
final LoadQueuePeon dropPeon = loadManagementPeons.get(fromServer.getName());
if (dropPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", fromServer.getName());
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IAE(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
toServer.getName(),
segment,
segment.getSize(),
toHolder.getAvailableSize()
);
}
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
final String toLoadQueueSegPath = ZKPaths.makePath(
ZKPaths.makePath(
zkPaths.getLoadQueuePath(),
toServer.getName()
), segmentName
);
final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), to), segmentName
ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), toServer.getName()),
segmentName
);
loadPeon.loadSegment(
@ -415,35 +416,9 @@ public class DruidCoordinator
}
catch (Exception e) {
log.makeAlert(e, "Exception moving segment %s", segmentName).emit();
callback.execute();
}
}
public void dropSegment(String from, String segmentName, final LoadPeonCallback callback)
{
try {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IAE("Unable to find server [%s]", from);
if (callback != null) {
callback.execute();
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
}
if (!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception dropping segment %s", segmentName).emit();
callback.execute();
}
}
@ -561,9 +536,15 @@ public class DruidCoordinator
databaseRuleManager.start();
serverInventoryView.start();
serviceAnnouncer.announce(self);
final int startingLeaderCounter = leaderCounter;
final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList();
coordinatorRunnables.add(Pair.of(new CoordinatorHistoricalManagerRunnable(), config.getCoordinatorPeriod()));
coordinatorRunnables.add(
Pair.of(
new CoordinatorHistoricalManagerRunnable(startingLeaderCounter),
config.getCoordinatorPeriod()
)
);
if (indexingServiceClient != null) {
coordinatorRunnables.add(
Pair.of(
@ -573,14 +554,14 @@ public class DruidCoordinator
DatasourceWhitelist.CONFIG_KEY,
DatasourceWhitelist.class
)
)
),
startingLeaderCounter
),
config.getCoordinatorIndexingPeriod()
)
);
}
final int startingLeaderCounter = leaderCounter;
for (final Pair<? extends CoordinatorRunnable, Duration> coordinatorRunnable : coordinatorRunnables) {
ScheduledExecutors.scheduleWithFixedDelay(
exec,
@ -724,10 +705,12 @@ public class DruidCoordinator
{
private final long startTime = System.currentTimeMillis();
private final List<DruidCoordinatorHelper> helpers;
private final int startingLeaderCounter;
protected CoordinatorRunnable(List<DruidCoordinatorHelper> helpers)
protected CoordinatorRunnable(List<DruidCoordinatorHelper> helpers, final int startingLeaderCounter)
{
this.helpers = helpers;
this.startingLeaderCounter = startingLeaderCounter;
}
@Override
@ -769,7 +752,10 @@ public class DruidCoordinator
.build();
for (DruidCoordinatorHelper helper : helpers) {
params = helper.run(params);
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
}
}
catch (Exception e) {
@ -780,7 +766,7 @@ public class DruidCoordinator
private class CoordinatorHistoricalManagerRunnable extends CoordinatorRunnable
{
private CoordinatorHistoricalManagerRunnable()
public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
{
super(
ImmutableList.of(
@ -791,7 +777,7 @@ public class DruidCoordinator
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Display info about all historical servers
Iterable<DruidServer> servers = FunctionalIterable
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(
new Predicate<DruidServer>()
@ -804,14 +790,23 @@ public class DruidCoordinator
return input.isAssignable();
}
}
).transform(
new Function<DruidServer, ImmutableDruidServer>()
{
@Override
public ImmutableDruidServer apply(DruidServer input)
{
return input.toImmutableDruidServer();
}
}
);
if (log.isDebugEnabled()) {
log.debug("Servers");
for (DruidServer druidServer : servers) {
for (ImmutableDruidServer druidServer : servers) {
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (DruidDataSource druidDataSource : druidServer.getDataSources()) {
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
}
@ -819,7 +814,7 @@ public class DruidCoordinator
// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
for (DruidServer server : servers) {
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
@ -835,7 +830,7 @@ public class DruidCoordinator
// Stop peons for servers that aren't there anymore.
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (DruidServer server : servers) {
for (ImmutableDruidServer server : servers) {
disappeared.remove(server.getName());
}
for (String name : disappeared) {
@ -857,16 +852,17 @@ public class DruidCoordinator
new DruidCoordinatorCleanup(DruidCoordinator.this),
new DruidCoordinatorBalancer(DruidCoordinator.this),
new DruidCoordinatorLogger()
)
),
startingLeaderCounter
);
}
}
private class CoordinatorIndexingServiceRunnable extends CoordinatorRunnable
{
private CoordinatorIndexingServiceRunnable(List<DruidCoordinatorHelper> helpers)
public CoordinatorIndexingServiceRunnable(List<DruidCoordinatorHelper> helpers, final int startingLeaderCounter)
{
super(helpers);
super(helpers, startingLeaderCounter);
}
}
}

View File

@ -23,7 +23,7 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Table;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import java.util.Map;
@ -40,7 +40,7 @@ public class SegmentReplicantLookup
for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serversByType) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
for (DataSegment segment : server.getSegments().values()) {
Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier());

View File

@ -21,6 +21,7 @@ package io.druid.server.coordinator;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
/**
@ -28,11 +29,11 @@ import io.druid.timeline.DataSegment;
public class ServerHolder implements Comparable<ServerHolder>
{
private static final Logger log = new Logger(ServerHolder.class);
private final DruidServer server;
private final ImmutableDruidServer server;
private final LoadQueuePeon peon;
public ServerHolder(
DruidServer server,
ImmutableDruidServer server,
LoadQueuePeon peon
)
{
@ -40,7 +41,7 @@ public class ServerHolder implements Comparable<ServerHolder>
this.peon = peon;
}
public DruidServer getServer()
public ImmutableDruidServer getServer()
{
return server;
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
@ -149,21 +150,20 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
protected void moveSegment(
final BalancerSegmentHolder segment,
final DruidServer toServer,
final ImmutableDruidServer toServer,
final DruidCoordinatorRuntimeParams params
)
{
final String toServerName = toServer.getName();
final LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName);
final LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServer.getName());
final String fromServerName = segment.getFromServer().getName();
final ImmutableDruidServer fromServer = segment.getFromServer();
final DataSegment segmentToMove = segment.getSegment();
final String segmentName = segmentToMove.getIdentifier();
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
(toServer.getSegment(segmentName) == null) &&
new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
log.info("Moving [%s] from [%s] to [%s]", segmentName, fromServerName, toServerName);
log.info("Moving [%s] from [%s] to [%s]", segmentName, fromServer.getName(), toServer.getName());
LoadPeonCallback callback = null;
try {
@ -180,8 +180,8 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
}
};
coordinator.moveSegment(
fromServerName,
toServerName,
fromServer,
toServer,
segmentToMove.getIdentifier(),
callback
);

View File

@ -25,6 +25,8 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
@ -32,7 +34,6 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -65,9 +66,9 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper
// Drop segments that no longer exist in the available segments configuration
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
for (DruidDataSource dataSource : server.getDataSources()) {
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
for (DataSegment segment : dataSource.getSegments()) {
if (!availableSegments.contains(segment)) {
LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
@ -97,9 +98,9 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
for (DruidDataSource dataSource : server.getDataSources()) {
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSource.getName());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<String, DataSegment>(Comparators.comparable());

View File

@ -25,7 +25,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.collections.CountingMap;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
@ -156,7 +156,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
log.info("Load Queues:");
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
LoadQueuePeon queuePeon = serverHolder.getPeon();
log.info(
"Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d bytes queued, %,d bytes served.",

View File

@ -76,8 +76,8 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
}
DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExisting()
.withReplicationManager(replicatorThrottler)
.build();
.withReplicationManager(replicatorThrottler)
.build();
// Run through all matched rules for available segments
DateTime now = new DateTime();
@ -94,10 +94,8 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
}
if (!foundMatchingRule) {
log.makeAlert(
"Unable to find a matching rule for dataSource[%s]",
segment.getDataSource()
)
log.makeAlert("Unable to find a matching rule!")
.addData("dataSource", segment.getDataSource())
.addData("segment", segment.getIdentifier())
.emit();
}

View File

@ -21,7 +21,7 @@ package io.druid.server.coordination;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CostBalancerStrategy;
import io.druid.server.coordinator.LoadQueuePeonTester;
@ -52,7 +52,7 @@ public class CostBalancerStrategyTest
// Each having having 100 segments
for (int i = 0; i < serverCount; i++) {
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
DruidServer druidServer = EasyMock.createMock(DruidServer.class);
ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("DruidServer_Name_" + i).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes();
@ -73,7 +73,7 @@ public class CostBalancerStrategyTest
// The best server to be available for next segment assignment has only 98 Segments
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
DruidServer druidServer = EasyMock.createMock(DruidServer.class);
ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes();

View File

@ -28,6 +28,7 @@ import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.db.DatabaseRuleManager;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.PeriodLoadRule;
@ -49,8 +50,8 @@ public class DruidCoordinatorBalancerProfiler
{
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private DruidCoordinator coordinator;
private DruidServer druidServer1;
private DruidServer druidServer2;
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
Map<String, DataSegment> segments = Maps.newHashMap();
ServiceEmitter emitter;
DatabaseRuleManager manager;
@ -61,8 +62,8 @@ public class DruidCoordinatorBalancerProfiler
public void setUp() throws Exception
{
coordinator = EasyMock.createMock(DruidCoordinator.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
manager = EasyMock.createMock(DatabaseRuleManager.class);
@ -79,8 +80,8 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.replay(manager);
coordinator.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
@ -109,7 +110,7 @@ public class DruidCoordinatorBalancerProfiler
}
for (int i = 0; i < numServers; i++) {
DruidServer server = EasyMock.createMock(DruidServer.class);
ImmutableDruidServer server = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes();
EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce();
@ -203,8 +204,8 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.replay(druidServer2);
coordinator.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);

View File

@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
@ -44,10 +44,10 @@ public class DruidCoordinatorBalancerTest
{
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private DruidCoordinator coordinator;
private DruidServer druidServer1;
private DruidServer druidServer2;
private DruidServer druidServer3;
private DruidServer druidServer4;
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
private ImmutableDruidServer druidServer3;
private ImmutableDruidServer druidServer4;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
@ -58,10 +58,10 @@ public class DruidCoordinatorBalancerTest
public void setUp() throws Exception
{
coordinator = EasyMock.createMock(DruidCoordinator.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer3 = EasyMock.createMock(DruidServer.class);
druidServer4 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
@ -156,8 +156,8 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs
coordinator.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
@ -233,8 +233,8 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs
coordinator.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
@ -317,8 +317,8 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs
coordinator.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);

View File

@ -19,7 +19,7 @@
package io.druid.server.coordinator;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.timeline.DataSegment;
@ -33,7 +33,7 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
@Override
protected void moveSegment(
final BalancerSegmentHolder segment,
final DruidServer toServer,
final ImmutableDruidServer toServer,
final DruidCoordinatorRuntimeParams params
)
{

View File

@ -135,7 +135,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -151,7 +151,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"normal",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -167,7 +167,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"cold",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -233,7 +233,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
@ -244,7 +244,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -260,7 +260,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"cold",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -337,7 +337,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -346,7 +346,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
normServer,
normServer.toImmutableDruidServer(),
mockPeon
)
)
@ -410,7 +410,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"normal",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -461,7 +461,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"normal",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -522,7 +522,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server,
server.toImmutableDruidServer(),
mockPeon
)
)
@ -594,11 +594,11 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)
@ -672,7 +672,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
)
)
@ -681,7 +681,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)
@ -751,7 +751,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
)
)
@ -760,7 +760,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)
@ -844,15 +844,15 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
anotherMockPeon
),
new ServerHolder(
server3,
server3.toImmutableDruidServer(),
anotherMockPeon
)
)
@ -915,7 +915,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
@ -926,7 +926,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -1023,7 +1023,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -1039,7 +1039,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
DruidServer.DEFAULT_TIER,
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -1126,11 +1126,11 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)

View File

@ -19,14 +19,17 @@
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.client.SingleServerInventoryView;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.db.DatabaseSegmentManager;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
@ -96,7 +99,8 @@ public class DruidCoordinatorTest
return null;
}
},
new ZkPathsConfig(){
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
@ -130,36 +134,43 @@ public class DruidCoordinatorTest
@Test
public void testMoveSegment() throws Exception
{
EasyMock.expect(druidServer.getSegment("dummySegment")).andReturn(segment);
EasyMock.expect(druidServer.getMaxSize()).andReturn(new Long(5));
EasyMock.expect(druidServer.getCurrSize()).andReturn(new Long(1)).atLeastOnce();
EasyMock.expect(druidServer.getName()).andReturn("blah");
EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
new DruidServerMetadata("blah", null, 5L, null, null, 0),
1L,
null,
ImmutableMap.of("dummySegment", segment)
)
).atLeastOnce();
EasyMock.replay(druidServer);
loadManagementPeons.put("to", loadQueuePeon);
loadManagementPeons.put("from", loadQueuePeon);
loadManagementPeons.put("blah", loadQueuePeon);
EasyMock.expect(loadQueuePeon.getLoadQueueSize()).andReturn(new Long(1));
EasyMock.replay(loadQueuePeon);
EasyMock.expect(serverInventoryView.getInventoryValue("from")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryValue("to")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return "";
}
EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return "";
}
@Override
public String getInventoryPath()
{
return "";
}
});
@Override
public String getInventoryPath()
{
return "";
}
}
);
EasyMock.replay(serverInventoryView);
coordinator.moveSegment("from", "to", "dummySegment", null);
coordinator.moveSegment(
druidServer.toImmutableDruidServer(),
druidServer.toImmutableDruidServer(),
"dummySegment", null
);
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
@ -37,10 +38,10 @@ import java.util.Map;
public class ReservoirSegmentSamplerTest
{
private DruidServer druidServer1;
private DruidServer druidServer2;
private DruidServer druidServer3;
private DruidServer druidServer4;
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
private ImmutableDruidServer druidServer3;
private ImmutableDruidServer druidServer4;
private ServerHolder holder1;
private ServerHolder holder2;
@ -60,11 +61,11 @@ public class ReservoirSegmentSamplerTest
@Before
public void setUp() throws Exception
{
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer3 = EasyMock.createMock(DruidServer.class);
druidServer4 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
holder1 = EasyMock.createMock(ServerHolder.class);
holder2 = EasyMock.createMock(ServerHolder.class);
holder3 = EasyMock.createMock(ServerHolder.class);

View File

@ -143,7 +143,7 @@ public class LoadRuleTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -159,7 +159,7 @@ public class LoadRuleTest
"historical",
DruidServer.DEFAULT_TIER,
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -252,7 +252,7 @@ public class LoadRuleTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
)
)
@ -261,7 +261,7 @@ public class LoadRuleTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)