Coordinator balancer move then drop fix (#5528)

* #5521 part 1

* formatting

* oops

* less magic tests
This commit is contained in:
Clint Wylie 2018-03-29 10:30:12 -07:00 committed by Jihoon Son
parent 8878a7ff94
commit 30fc4d3ba0
6 changed files with 678 additions and 182 deletions

View File

@ -23,17 +23,16 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.curator.inventory.CuratorInventoryManager;
import io.druid.curator.inventory.CuratorInventoryManagerStrategy;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Collection;
@ -157,14 +156,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
{
log.info("Inventory Initialized");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
input -> input.segmentViewInitialized()
);
}
}
@ -233,17 +225,12 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
() -> {
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
}
);
}
}
@ -252,16 +239,11 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
() -> {
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
}
}
}
);
}
}
@ -286,14 +268,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
container.addDataSegment(inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container.getMetadata(), inventory);
}
}
input -> input.segmentAdded(container.getMetadata(), inventory)
);
}
@ -315,14 +290,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container.getMetadata(), segment);
}
}
input -> input.segmentRemoved(container.getMetadata(), segment)
);
}
@ -330,11 +298,8 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
try {
String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(getInventoryManagerConfig().getInventoryPath(), serverKey),
segment.getIdentifier()
);
return curator.checkExists().forPath(toServedSegPath) != null;
DruidServer server = getInventoryValue(serverKey);
return server != null && server.getSegment(segment.getIdentifier()) != null;
}
catch (Exception ex) {
throw Throwables.propagate(ex);

View File

@ -34,14 +34,12 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
@ -261,11 +259,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
() -> {
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
@ -274,18 +268,13 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
catch (Exception e) {
failAssign(e);
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
(CuratorWatcher) watchedEvent -> {
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
@ -294,7 +283,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
// do nothing
}
}
}
).forPath(path);
if (stat == null) {
@ -341,14 +329,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
executeCallbacks(callbacks);
}
}
() -> executeCallbacks(callbacks)
);
}
}
@ -360,11 +341,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
processingExecutor,
config.getLoadQueuePeonRepeatDelay(),
config.getLoadQueuePeonRepeatDelay(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
() -> {
processSegmentChangeRequest();
if (stopped) {
@ -373,7 +350,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}

View File

@ -20,7 +20,6 @@
package io.druid.server.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -30,8 +29,6 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
@ -56,6 +53,8 @@ import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.DruidNode;
@ -98,14 +97,8 @@ public class DruidCoordinator
{
public static Comparator<DataSegment> SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
.onResultOf(
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
})
(Function<DataSegment, Interval>) segment -> segment
.getInterval())
.compound(Ordering.<DataSegment>natural())
.reverse();
@ -572,7 +565,8 @@ public class DruidCoordinator
if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
theRunnable.run();
}
if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader)
if (coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
@ -697,35 +691,12 @@ public class DruidCoordinator
super(
ImmutableList.of(
new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
new DruidCoordinatorHelper()
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
params -> {
// Display info about all historical servers
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(
new Predicate<DruidServer>()
{
@Override
public boolean apply(
DruidServer input
)
{
return input.segmentReplicatable();
}
}
).transform(
new Function<DruidServer, ImmutableDruidServer>()
{
@Override
public ImmutableDruidServer apply(DruidServer input)
{
return input.toImmutableDruidServer();
}
}
);
.filter(DruidServer::segmentReplicatable)
.transform(DruidServer::toImmutableDruidServer);
if (log.isDebugEnabled()) {
log.debug("Servers");
@ -772,7 +743,6 @@ public class DruidCoordinator
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
.build();
}
},
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),

View File

@ -20,9 +20,9 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.emitter.EmittingLogger;
import com.google.common.collect.Ordering;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
@ -92,7 +92,7 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
for (DataSegment segment : params.getAvailableSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<>(Comparators.comparable());
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
timelines.put(segment.getDataSource(), timeline);
}

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import io.druid.client.DruidServer;
import io.druid.common.utils.UUIDUtils;
import io.druid.java.util.common.DateTimes;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -44,6 +46,8 @@ public class CuratorTestBase
protected Timing timing;
protected CuratorFramework curator;
private int batchCtr = 0;
protected void setupServerAndCurator() throws Exception
{
server = new TestingServer();
@ -127,6 +131,47 @@ public class CuratorTestBase
}
}
protected String announceBatchSegmentsForServer(
DruidServer druidServer,
ImmutableSet<DataSegment> segments,
ZkPathsConfig zkPathsConfig,
ObjectMapper jsonMapper
)
{
final String segmentAnnouncementPath = ZKPaths.makePath(ZKPaths.makePath(
zkPathsConfig.getLiveSegmentsPath(),
druidServer.getHost()),
UUIDUtils.generateUuid(
druidServer.getHost(),
druidServer.getType().toString(),
druidServer.getTier(),
DateTimes.nowUtc().toString()
) + String.valueOf(batchCtr++)
);
try {
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments));
}
catch (KeeperException.NodeExistsException e) {
try {
curator.setData()
.forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments));
}
catch (Exception e1) {
Throwables.propagate(e1);
}
}
catch (Exception e) {
Throwables.propagate(e);
}
return segmentAnnouncementPath;
}
protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig)
throws Exception
{
@ -138,6 +183,12 @@ public class CuratorTestBase
);
}
protected void unannounceSegmentFromBatchForServer(DruidServer druidServer, DataSegment segment, String batchPath, ZkPathsConfig zkPathsConfig)
throws Exception
{
curator.delete().guaranteed().forPath(batchPath);
}
protected void tearDownServerAndCurator()
{
try {

View File

@ -0,0 +1,534 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.druid.client.BatchServerInventoryView;
import io.druid.client.CoordinatorServerView;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.common.config.JacksonConfigManager;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.segment.TestHelper;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* This tests zookeeper specific coordinator/load queue/historical interactions, such as moving segments by the balancer
*/
public class CuratorDruidCoordinatorTest extends CuratorTestBase
{
private DruidCoordinator coordinator;
private MetadataSegmentManager databaseSegmentManager;
private ScheduledExecutorFactory scheduledExecutorFactory;
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
private LoadQueuePeon sourceLoadQueuePeon;
private LoadQueuePeon destinationLoadQueuePeon;
private MetadataRuleManager metadataRuleManager;
private CountDownLatch leaderAnnouncerLatch;
private CountDownLatch leaderUnannouncerLatch;
private PathChildrenCache sourceLoadQueueChildrenCache;
private PathChildrenCache destinationLoadQueueChildrenCache;
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
private JacksonConfigManager configManager;
private DruidNode druidNode;
private static final String SEGPATH = "/druid/segments";
private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;
private BatchServerInventoryView baseView;
private CoordinatorServerView serverView;
private CountDownLatch segmentViewInitLatch;
private CountDownLatch segmentAddedLatch;
private CountDownLatch segmentRemovedLatch;
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig;
public CuratorDruidCoordinatorTest()
{
jsonMapper = TestHelper.makeJsonMapper();
zkPathsConfig = new ZkPathsConfig();
}
@Before
public void setUp() throws Exception
{
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(CoordinatorDynamicConfig.builder().build())).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(CoordinatorCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
curator.create().creatingParentsIfNeeded().forPath(SEGPATH);
curator.create().creatingParentsIfNeeded().forPath(SOURCE_LOAD_PATH);
curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH);
objectMapper = new DefaultObjectMapper();
druidCoordinatorConfig = new TestDruidCoordinatorConfig(
new Duration(COORDINATOR_START_DELAY),
new Duration(COORDINATOR_PERIOD),
null,
null,
new Duration(COORDINATOR_PERIOD),
null,
10,
null,
false,
false,
new Duration("PT0s")
);
sourceLoadQueueChildrenCache = new PathChildrenCache(
curator,
SOURCE_LOAD_PATH,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_src-%d")
);
destinationLoadQueueChildrenCache = new PathChildrenCache(
curator,
DESTINATION_LOAD_PATH,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_dest-%d")
);
sourceLoadQueuePeon = new CuratorLoadQueuePeon(
curator,
SOURCE_LOAD_PATH,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"),
druidCoordinatorConfig
);
destinationLoadQueuePeon = new CuratorLoadQueuePeon(
curator,
DESTINATION_LOAD_PATH,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"),
druidCoordinatorConfig
);
druidNode = new DruidNode("hey", "what", 1234, null, true, false);
loadManagementPeons = new ConcurrentHashMap<>();
scheduledExecutorFactory = (corePoolSize, nameFormat) -> Executors.newSingleThreadScheduledExecutor();
leaderAnnouncerLatch = new CountDownLatch(1);
leaderUnannouncerLatch = new CountDownLatch(1);
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "druid";
}
},
configManager,
databaseSegmentManager,
baseView,
metadataRuleManager,
curator,
new NoopServiceEmitter(),
scheduledExecutorFactory,
null,
null,
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}
@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
druidNode,
loadManagementPeons,
null,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector()
);
}
@After
public void tearDown() throws Exception
{
baseView.stop();
sourceLoadQueuePeon.stop();
sourceLoadQueueChildrenCache.close();
destinationLoadQueueChildrenCache.close();
tearDownServerAndCurator();
}
@Test(timeout = 5_000)
public void testMoveSegment() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(4);
segmentRemovedLatch = new CountDownLatch(0);
CountDownLatch destCountdown = new CountDownLatch(1);
CountDownLatch srcCountdown = new CountDownLatch(1);
setupView();
DruidServer source = new DruidServer(
"localhost:1",
"localhost:1",
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
);
DruidServer dest = new DruidServer(
"localhost:2",
"localhost:2",
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
);
setupZNodeForServer(source, zkPathsConfig, jsonMapper);
setupZNodeForServer(dest, zkPathsConfig, jsonMapper);
final List<DataSegment> sourceSegments = Lists.transform(
ImmutableList.of(
Pair.of("2011-04-01/2011-04-03", "v1"),
Pair.of("2011-04-03/2011-04-06", "v1"),
Pair.of("2011-04-06/2011-04-09", "v1")
),
input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
);
final List<DataSegment> destinationSegments = Lists.transform(
ImmutableList.of(
Pair.of("2011-03-31/2011-04-01", "v1")
),
input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
);
DataSegment segmentToMove = sourceSegments.get(2);
List<String> sourceSegKeys = Lists.newArrayList();
List<String> destSegKeys = Lists.newArrayList();
for (DataSegment segment : sourceSegments) {
sourceSegKeys.add(announceBatchSegmentsForServer(source, ImmutableSet.of(segment), zkPathsConfig, jsonMapper));
}
for (DataSegment segment : destinationSegments) {
destSegKeys.add(announceBatchSegmentsForServer(dest, ImmutableSet.of(segment), zkPathsConfig, jsonMapper));
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
// these child watchers are used to simulate actions of historicals, announcing a segment on noticing a load queue
// for the destination and unannouncing from source server when noticing a drop request
sourceLoadQueueChildrenCache.getListenable().addListener(
(curatorFramework, pathChildrenCacheEvent) -> {
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
srcCountdown.countDown();
} else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
//Simulate source server dropping segment
unannounceSegmentFromBatchForServer(source, segmentToMove, sourceSegKeys.get(2), zkPathsConfig);
}
}
);
destinationLoadQueueChildrenCache.getListenable().addListener(
(curatorFramework, pathChildrenCacheEvent) -> {
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
destCountdown.countDown();
} else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
//Simulate destination server loading segment
announceBatchSegmentsForServer(dest, ImmutableSet.of(segmentToMove), zkPathsConfig, jsonMapper);
}
}
);
sourceLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
destinationLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
Assert.assertTrue(timing.forWaiting().awaitLatch(srcCountdown));
Assert.assertTrue(timing.forWaiting().awaitLatch(destCountdown));
loadManagementPeons.put("localhost:1", sourceLoadQueuePeon);
loadManagementPeons.put("localhost:2", destinationLoadQueuePeon);
segmentRemovedLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(1);
ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyString())).andReturn(sourceSegments.get(2));
EasyMock.replay(druidDataSource);
EasyMock.expect(databaseSegmentManager.getInventoryValue(EasyMock.anyString())).andReturn(druidDataSource);
EasyMock.replay(databaseSegmentManager);
coordinator.moveSegment(
source.toImmutableDruidServer(),
dest.toImmutableDruidServer(),
sourceSegments.get(2),
null
);
// wait for destination server to load segment
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
// remove load queue key from destination server to trigger adding drop to load queue
curator.delete().guaranteed().forPath(ZKPaths.makePath(DESTINATION_LOAD_PATH, segmentToMove.getIdentifier()));
// wait for drop
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
// clean up drop from load queue
curator.delete().guaranteed().forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, segmentToMove.getIdentifier()));
List<DruidServer> servers = serverView.getInventory().stream().collect(Collectors.toList());
Assert.assertEquals(2, servers.get(0).getSegments().size());
Assert.assertEquals(2, servers.get(1).getSegments().size());
}
private static class TestDruidLeaderSelector implements DruidLeaderSelector
{
private volatile Listener listener;
private volatile String leader;
@Override
public String getCurrentLeader()
{
return leader;
}
@Override
public boolean isLeader()
{
return leader != null;
}
@Override
public int localTerm()
{
return 0;
}
@Override
public void registerListener(Listener listener)
{
this.listener = listener;
leader = "what:1234";
listener.becomeLeader();
}
@Override
public void unregisterListener()
{
leader = null;
listener.stopBeingLeader();
}
}
private void setupView() throws Exception
{
baseView = new BatchServerInventoryView(
zkPathsConfig,
curator,
jsonMapper,
Predicates.alwaysTrue()
)
{
@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
{
super.registerSegmentCallback(
exec, new SegmentCallback()
{
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
CallbackAction res = callback.segmentAdded(server, segment);
segmentAddedLatch.countDown();
return res;
}
@Override
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
CallbackAction res = callback.segmentRemoved(server, segment);
segmentRemovedLatch.countDown();
return res;
}
@Override
public CallbackAction segmentViewInitialized()
{
CallbackAction res = callback.segmentViewInitialized();
segmentViewInitLatch.countDown();
return res;
}
}
);
}
};
serverView = new CoordinatorServerView(baseView);
baseView.start();
sourceLoadQueuePeon.start();
destinationLoadQueuePeon.start();
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "druid";
}
},
configManager,
databaseSegmentManager,
baseView,
metadataRuleManager,
curator,
new NoopServiceEmitter(),
scheduledExecutorFactory,
null,
null,
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}
@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
druidNode,
loadManagementPeons,
null,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector()
);
}
private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version)
{
return DataSegment.builder()
.dataSource("test_curator_druid_coordinator")
.interval(Intervals.of(intervalStr))
.loadSpec(
ImmutableMap.of(
"type",
"local",
"path",
"somewhere"
)
)
.version(version)
.dimensions(ImmutableList.of())
.metrics(ImmutableList.of())
.shardSpec(NoneShardSpec.instance())
.binaryVersion(9)
.size(0)
.build();
}
}