snapshot coordinator state before running

This commit is contained in:
fjy 2014-05-18 22:32:20 -07:00
parent b41b80fab4
commit 5ee3f5c9d3
20 changed files with 339 additions and 104 deletions

View File

@ -117,4 +117,9 @@ public class DruidDataSource
", partitions=" + segmentsHolder.toString() +
'}';
}
public ImmutableDruidDataSource toImmutableDruidDataSource()
{
return new ImmutableDruidDataSource(name, properties, partitionNames, Collections.unmodifiableSet(segmentsHolder));
}
}

View File

@ -21,12 +21,15 @@ package io.druid.client;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -273,4 +276,26 @@ public class DruidServer implements Comparable
return getName().compareTo(((DruidServer) o).getName());
}
public ImmutableDruidServer toImmutableDruidServer()
{
return new ImmutableDruidServer(
metadata,
currSize,
Collections.unmodifiableMap(
Maps.transformValues(
dataSources,
new Function<DruidDataSource, ImmutableDruidDataSource>()
{
@Override
public ImmutableDruidDataSource apply(DruidDataSource input)
{
return input.toImmutableDruidDataSource();
}
}
)
),
Collections.unmodifiableMap(segments)
);
}
}

View File

@ -0,0 +1,74 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import io.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
/**
*/
public class ImmutableDruidDataSource
{
private final String name;
private final Map<String, String> properties;
private final Map<String, DataSegment> partitionNames;
private final Set<DataSegment> segmentsHolder;
public ImmutableDruidDataSource(
String name,
Map<String, String> properties,
Map<String, DataSegment> partitionNames,
Set<DataSegment> segmentsHolder
)
{
this.name = name;
this.properties = properties;
this.partitionNames = partitionNames;
this.segmentsHolder = segmentsHolder;
}
public String getName()
{
return name;
}
public Map<String, String> getProperties()
{
return Collections.unmodifiableMap(properties);
}
public Map<String, DataSegment> getPartitionNames()
{
return Collections.unmodifiableMap(partitionNames);
}
public boolean isEmpty()
{
return segmentsHolder.isEmpty();
}
public Set<DataSegment> getSegments()
{
return Collections.unmodifiableSet(segmentsHolder);
}
}

View File

@ -0,0 +1,107 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterables;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.Map;
/**
*/
public class ImmutableDruidServer
{
private final DruidServerMetadata metadata;
private final long currSize;
private final Map<String, ImmutableDruidDataSource> dataSources;
private final Map<String, DataSegment> segments;
public ImmutableDruidServer(
DruidServerMetadata metadata,
long currSize,
Map<String, ImmutableDruidDataSource> dataSources,
Map<String, DataSegment> segments
)
{
this.metadata = metadata;
this.currSize = currSize;
this.segments = segments;
this.dataSources = dataSources;
}
public String getName()
{
return metadata.getName();
}
public DruidServerMetadata getMetadata()
{
return metadata;
}
public String getHost()
{
return metadata.getHost();
}
public long getCurrSize()
{
return currSize;
}
@JsonProperty
public long getMaxSize()
{
return metadata.getMaxSize();
}
public String getType()
{
return metadata.getType();
}
public String getTier()
{
return metadata.getTier();
}
public int getPriority()
{
return metadata.getPriority();
}
public DataSegment getSegment(String segmentName)
{
return segments.get(segmentName);
}
public Iterable<ImmutableDruidDataSource> getDataSources()
{
return Iterables.unmodifiableIterable(dataSources.values());
}
public Map<String, DataSegment> getSegments()
{
return Collections.unmodifiableMap(segments);
}
}

View File

@ -19,21 +19,21 @@
package io.druid.server.coordinator;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
/**
*/
public class BalancerSegmentHolder
{
private final DruidServer fromServer;
private final ImmutableDruidServer fromServer;
private final DataSegment segment;
// This is a pretty fugly hard coding of the maximum lifetime
private volatile int lifetime = 15;
public BalancerSegmentHolder(
DruidServer fromServer,
ImmutableDruidServer fromServer,
DataSegment segment
)
{
@ -41,7 +41,7 @@ public class BalancerSegmentHolder
this.segment = segment;
}
public DruidServer getFromServer()
public ImmutableDruidServer getFromServer()
{
return fromServer;
}

View File

@ -22,7 +22,7 @@ package io.druid.server.coordinator;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import java.util.Map;
@ -46,7 +46,7 @@ public class DruidCluster
public void add(ServerHolder serverHolder)
{
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
MinMaxPriorityQueue<ServerHolder> tierServers = cluster.get(server.getTier());
if (tierServers == null) {
tierServers = MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create();

View File

@ -31,12 +31,10 @@ import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
@ -44,6 +42,8 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.collections.CountingMap;
@ -345,12 +345,12 @@ public class DruidCoordinator
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
{
try {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
final ImmutableDruidServer fromServer = serverInventoryView.getInventoryValue(from).toImmutableDruidServer();
if (fromServer == null) {
throw new IAE("Unable to find server [%s]", from);
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
final ImmutableDruidServer toServer = serverInventoryView.getInventoryValue(to).toImmutableDruidServer();
if (toServer == null) {
throw new IAE("Unable to find server [%s]", to);
}
@ -791,7 +791,7 @@ public class DruidCoordinator
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Display info about all historical servers
Iterable<DruidServer> servers = FunctionalIterable
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(
new Predicate<DruidServer>()
@ -804,14 +804,23 @@ public class DruidCoordinator
return input.isAssignable();
}
}
).transform(
new Function<DruidServer, ImmutableDruidServer>()
{
@Override
public ImmutableDruidServer apply(DruidServer input)
{
return input.toImmutableDruidServer();
}
}
);
if (log.isDebugEnabled()) {
log.debug("Servers");
for (DruidServer druidServer : servers) {
for (ImmutableDruidServer druidServer : servers) {
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (DruidDataSource druidDataSource : druidServer.getDataSources()) {
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
}
@ -819,7 +828,7 @@ public class DruidCoordinator
// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
for (DruidServer server : servers) {
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
@ -835,7 +844,7 @@ public class DruidCoordinator
// Stop peons for servers that aren't there anymore.
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (DruidServer server : servers) {
for (ImmutableDruidServer server : servers) {
disappeared.remove(server.getName());
}
for (String name : disappeared) {

View File

@ -23,7 +23,7 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Table;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import java.util.Map;
@ -40,7 +40,7 @@ public class SegmentReplicantLookup
for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serversByType) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
for (DataSegment segment : server.getSegments().values()) {
Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier());

View File

@ -21,6 +21,7 @@ package io.druid.server.coordinator;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
/**
@ -28,11 +29,11 @@ import io.druid.timeline.DataSegment;
public class ServerHolder implements Comparable<ServerHolder>
{
private static final Logger log = new Logger(ServerHolder.class);
private final DruidServer server;
private final ImmutableDruidServer server;
private final LoadQueuePeon peon;
public ServerHolder(
DruidServer server,
ImmutableDruidServer server,
LoadQueuePeon peon
)
{
@ -40,7 +41,7 @@ public class ServerHolder implements Comparable<ServerHolder>
this.peon = peon;
}
public DruidServer getServer()
public ImmutableDruidServer getServer()
{
return server;
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
@ -149,7 +150,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
protected void moveSegment(
final BalancerSegmentHolder segment,
final DruidServer toServer,
final ImmutableDruidServer toServer,
final DruidCoordinatorRuntimeParams params
)
{

View File

@ -25,6 +25,8 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
@ -32,7 +34,6 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -65,9 +66,9 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper
// Drop segments that no longer exist in the available segments configuration
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
for (DruidDataSource dataSource : server.getDataSources()) {
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
for (DataSegment segment : dataSource.getSegments()) {
if (!availableSegments.contains(segment)) {
LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
@ -97,9 +98,9 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
for (DruidDataSource dataSource : server.getDataSources()) {
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSource.getName());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<String, DataSegment>(Comparators.comparable());

View File

@ -25,7 +25,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.collections.CountingMap;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
@ -156,7 +156,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
log.info("Load Queues:");
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
DruidServer server = serverHolder.getServer();
ImmutableDruidServer server = serverHolder.getServer();
LoadQueuePeon queuePeon = serverHolder.getPeon();
log.info(
"Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d bytes queued, %,d bytes served.",

View File

@ -21,7 +21,7 @@ package io.druid.server.coordination;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CostBalancerStrategy;
import io.druid.server.coordinator.LoadQueuePeonTester;
@ -52,7 +52,7 @@ public class CostBalancerStrategyTest
// Each having having 100 segments
for (int i = 0; i < serverCount; i++) {
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
DruidServer druidServer = EasyMock.createMock(DruidServer.class);
ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("DruidServer_Name_" + i).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes();
@ -73,7 +73,7 @@ public class CostBalancerStrategyTest
// The best server to be available for next segment assignment has only 98 Segments
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
DruidServer druidServer = EasyMock.createMock(DruidServer.class);
ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes();

View File

@ -28,6 +28,7 @@ import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.db.DatabaseRuleManager;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.PeriodLoadRule;
@ -49,8 +50,8 @@ public class DruidCoordinatorBalancerProfiler
{
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private DruidCoordinator coordinator;
private DruidServer druidServer1;
private DruidServer druidServer2;
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
Map<String, DataSegment> segments = Maps.newHashMap();
ServiceEmitter emitter;
DatabaseRuleManager manager;
@ -61,8 +62,8 @@ public class DruidCoordinatorBalancerProfiler
public void setUp() throws Exception
{
coordinator = EasyMock.createMock(DruidCoordinator.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
manager = EasyMock.createMock(DatabaseRuleManager.class);
@ -109,7 +110,7 @@ public class DruidCoordinatorBalancerProfiler
}
for (int i = 0; i < numServers; i++) {
DruidServer server = EasyMock.createMock(DruidServer.class);
ImmutableDruidServer server = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes();
EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce();

View File

@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
@ -44,10 +44,10 @@ public class DruidCoordinatorBalancerTest
{
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private DruidCoordinator coordinator;
private DruidServer druidServer1;
private DruidServer druidServer2;
private DruidServer druidServer3;
private DruidServer druidServer4;
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
private ImmutableDruidServer druidServer3;
private ImmutableDruidServer druidServer4;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
@ -58,10 +58,10 @@ public class DruidCoordinatorBalancerTest
public void setUp() throws Exception
{
coordinator = EasyMock.createMock(DruidCoordinator.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer3 = EasyMock.createMock(DruidServer.class);
druidServer4 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);

View File

@ -19,7 +19,7 @@
package io.druid.server.coordinator;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.timeline.DataSegment;
@ -33,7 +33,7 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
@Override
protected void moveSegment(
final BalancerSegmentHolder segment,
final DruidServer toServer,
final ImmutableDruidServer toServer,
final DruidCoordinatorRuntimeParams params
)
{

View File

@ -135,7 +135,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -151,7 +151,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"normal",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -167,7 +167,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"cold",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -233,7 +233,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
@ -244,7 +244,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -260,7 +260,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"cold",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -337,7 +337,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -346,7 +346,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
normServer,
normServer.toImmutableDruidServer(),
mockPeon
)
)
@ -410,7 +410,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"normal",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -461,7 +461,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"normal",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -522,7 +522,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server,
server.toImmutableDruidServer(),
mockPeon
)
)
@ -594,11 +594,11 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)
@ -672,7 +672,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
)
)
@ -681,7 +681,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)
@ -751,7 +751,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
)
)
@ -760,7 +760,7 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)
@ -844,15 +844,15 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
anotherMockPeon
),
new ServerHolder(
server3,
server3.toImmutableDruidServer(),
anotherMockPeon
)
)
@ -915,7 +915,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
@ -926,7 +926,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -1023,7 +1023,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -1039,7 +1039,7 @@ public class DruidCoordinatorRuleRunnerTest
"historical",
DruidServer.DEFAULT_TIER,
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -1126,11 +1126,11 @@ public class DruidCoordinatorRuleRunnerTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)

View File

@ -19,14 +19,17 @@
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.client.SingleServerInventoryView;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.db.DatabaseSegmentManager;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
@ -96,7 +99,8 @@ public class DruidCoordinatorTest
return null;
}
},
new ZkPathsConfig(){
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
@ -130,10 +134,14 @@ public class DruidCoordinatorTest
@Test
public void testMoveSegment() throws Exception
{
EasyMock.expect(druidServer.getSegment("dummySegment")).andReturn(segment);
EasyMock.expect(druidServer.getMaxSize()).andReturn(new Long(5));
EasyMock.expect(druidServer.getCurrSize()).andReturn(new Long(1)).atLeastOnce();
EasyMock.expect(druidServer.getName()).andReturn("blah");
EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
new DruidServerMetadata("blah", null, 5L, null, null, 0),
1L,
null,
ImmutableMap.of("dummySegment", segment)
)
).atLeastOnce();
EasyMock.replay(druidServer);
loadManagementPeons.put("to", loadQueuePeon);
@ -144,20 +152,22 @@ public class DruidCoordinatorTest
EasyMock.expect(serverInventoryView.getInventoryValue("from")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryValue("to")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return "";
}
EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return "";
}
@Override
public String getInventoryPath()
{
return "";
}
});
@Override
public String getInventoryPath()
{
return "";
}
}
);
EasyMock.replay(serverInventoryView);
coordinator.moveSegment("from", "to", "dummySegment", null);

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
@ -37,10 +38,10 @@ import java.util.Map;
public class ReservoirSegmentSamplerTest
{
private DruidServer druidServer1;
private DruidServer druidServer2;
private DruidServer druidServer3;
private DruidServer druidServer4;
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
private ImmutableDruidServer druidServer3;
private ImmutableDruidServer druidServer4;
private ServerHolder holder1;
private ServerHolder holder2;
@ -60,11 +61,11 @@ public class ReservoirSegmentSamplerTest
@Before
public void setUp() throws Exception
{
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer3 = EasyMock.createMock(DruidServer.class);
druidServer4 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
holder1 = EasyMock.createMock(ServerHolder.class);
holder2 = EasyMock.createMock(ServerHolder.class);
holder3 = EasyMock.createMock(ServerHolder.class);

View File

@ -143,7 +143,7 @@ public class LoadRuleTest
"historical",
"hot",
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -159,7 +159,7 @@ public class LoadRuleTest
"historical",
DruidServer.DEFAULT_TIER,
0
),
).toImmutableDruidServer(),
mockPeon
)
)
@ -252,7 +252,7 @@ public class LoadRuleTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
server1.toImmutableDruidServer(),
mockPeon
)
)
@ -261,7 +261,7 @@ public class LoadRuleTest
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
server2.toImmutableDruidServer(),
mockPeon
)
)