Mark segments that are going to be dropped from the server and use it in DruidBalancer (#4486)

* Do not remove segment that should not be moved from currentlyMovingSegments (segments are removed by callbacks or not inserted)

* Mark segments that are going to be dropped from server and use this information in CostBalancerStrategy

* Fix tests
This commit is contained in:
dgolitsyn 2017-08-26 02:25:56 +03:00 committed by Roman Leventov
parent 598cc46bae
commit fd0f349c68
9 changed files with 111 additions and 33 deletions

View File

@ -311,9 +311,12 @@ public class CostBalancerStrategy implements BalancerStrategy
)
);
// plus the costs of segments that will be loaded
// plus the costs of segments that will be loaded
cost += computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsToLoad());
// minus the costs of segments that are marked to be dropped
cost -= computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsMarkedToDrop());
return cost;
}

View File

@ -423,28 +423,39 @@ public class DruidCoordinator
), segmentName
);
loadPeon.loadSegment(
segmentToLoad,
new LoadPeonCallback()
{
@Override
public void execute()
{
final LoadPeonCallback loadPeonCallback = () -> {
dropPeon.unmarkSegmentToDrop(segmentToLoad);
if (callback != null) {
callback.execute();
}
};
// mark segment to drop before it is actually loaded on server
// to be able to account this information in DruidBalancerStrategy immediately
dropPeon.markSegmentToDrop(segmentToLoad);
try {
loadPeon.loadSegment(
segmentToLoad,
() -> {
try {
if (serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment) &&
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
} else if (callback != null) {
callback.execute();
dropPeon.dropSegment(segment, loadPeonCallback);
} else {
loadPeonCallback.execute();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
);
}
catch (Exception e) {
dropPeon.unmarkSegmentToDrop(segmentToLoad);
Throwables.propagate(e);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception moving segment %s", segmentName).emit();

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -82,6 +83,9 @@ public class LoadQueuePeon
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final Object lock = new Object();
@ -117,6 +121,12 @@ public class LoadQueuePeon
return segmentsToDrop.keySet();
}
@JsonProperty
public Set<DataSegment> getSegmentsMarkedToDrop()
{
return segmentsMarkedToDrop;
}
public long getLoadQueueSize()
{
return queuedSize.get();
@ -191,6 +201,16 @@ public class LoadQueuePeon
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback)));
}
public void markSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.add(dataSegment);
}
public void unmarkSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.remove(dataSegment);
}
private void processSegmentChangeRequest()
{
if (currentlyProcessing != null) {

View File

@ -102,7 +102,6 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
return;
}
final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
if (serverHolderList.size() <= 1) {

View File

@ -175,14 +175,25 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
new BalancerSegmentHolder(druidServer1, segment2),
new BalancerSegmentHolder(druidServer1, segment3),
new BalancerSegmentHolder(druidServer1, segment4)
)
);
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
).build();
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") < segments.size());
Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
@Test
@ -313,7 +324,13 @@ public class DruidCoordinatorBalancerTest
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
if (!segments.isEmpty()) {
segments.values().forEach(
s -> EasyMock.expect(druidServer.getSegment(s.getIdentifier())).andReturn(s).anyTimes()
);
} else {
EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
}
EasyMock.replay(druidServer);
}

View File

@ -46,7 +46,7 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
final String segmentName = segmentToMove.getIdentifier();
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
!currentlyMovingSegments.get("normal").containsKey(segmentName) &&
(toServer.getSegment(segmentName) == null) &&
new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
log.info(
"Moving [%s] from [%s] to [%s]",
@ -65,6 +65,9 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
}
});
final LoadQueuePeon dropPeon = params.getLoadManagementPeons().get(fromServerName);
dropPeon.markSegmentToDrop(segment.getSegment());
currentlyMovingSegments.get("normal").put(segmentName, segment);
}
catch (Exception e) {

View File

@ -1441,9 +1441,7 @@ public class DruidCoordinatorRuleRunnerTest
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
createCoordinatorDynamicConfig()
).anyTimes();
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
@ -1452,6 +1450,7 @@ public class DruidCoordinatorRuleRunnerTest
private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);

View File

@ -56,6 +56,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -66,6 +67,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -208,18 +210,31 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void testMoveSegment() throws Exception
{
loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class);
segment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(segment.getIdentifier()).andReturn("dummySegment");
EasyMock.expect(segment.getDataSource()).andReturn("dummyDataSource");
EasyMock.replay(segment);
loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class);
EasyMock.expect(loadQueuePeon.getLoadQueueSize()).andReturn(new Long(1));
loadQueuePeon.markSegmentToDrop(segment);
EasyMock.expectLastCall().once();
Capture<LoadPeonCallback> loadCallbackCapture = Capture.newInstance();
Capture<LoadPeonCallback> dropCallbackCapture = Capture.newInstance();
loadQueuePeon.loadSegment(EasyMock.anyObject(DataSegment.class), EasyMock.capture(loadCallbackCapture));
EasyMock.expectLastCall().once();
loadQueuePeon.dropSegment(EasyMock.anyObject(DataSegment.class), EasyMock.capture(dropCallbackCapture));
EasyMock.expectLastCall().once();
loadQueuePeon.unmarkSegmentToDrop(segment);
EasyMock.expectLastCall().once();
EasyMock.expect(loadQueuePeon.getSegmentsToDrop()).andReturn(new HashSet<>()).once();
EasyMock.replay(loadQueuePeon);
DruidDataSource druidDataSource = EasyMock.createNiceMock(DruidDataSource.class);
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyString())).andReturn(segment);
EasyMock.replay(druidDataSource);
EasyMock.expect(databaseSegmentManager.getInventoryValue(EasyMock.anyString())).andReturn(druidDataSource);
EasyMock.replay(databaseSegmentManager);
EasyMock.replay(loadQueuePeon);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(scheduledExecutorFactory);
EasyMock.replay(metadataRuleManager);
@ -247,6 +262,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
loadManagementPeons.put("from", loadQueuePeon);
loadManagementPeons.put("to", loadQueuePeon);
EasyMock.expect(serverInventoryView.isSegmentLoadedByServer("to", segment)).andReturn(true).once();
EasyMock.replay(serverInventoryView);
coordinator.moveSegment(
@ -254,6 +270,13 @@ public class DruidCoordinatorTest extends CuratorTestBase
druidServer2.toImmutableDruidServer(),
segment, null
);
LoadPeonCallback loadCallback = loadCallbackCapture.getValue();
loadCallback.execute();
LoadPeonCallback dropCallback = dropCallbackCapture.getValue();
dropCallback.execute();
EasyMock.verify(druidServer);
EasyMock.verify(druidServer2);
EasyMock.verify(loadQueuePeon);

View File

@ -45,7 +45,6 @@ import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.CostBalancerStrategyFactory;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.LoadQueuePeonTester;
import io.druid.server.coordinator.ReplicationThrottler;
@ -110,9 +109,10 @@ public class LoadRuleTest
@Test
public void testLoad() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
@ -220,9 +220,10 @@ public class LoadRuleTest
@Test
public void testDrop() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
@ -334,9 +335,10 @@ public class LoadRuleTest
@Test
public void testLoadWithNonExistentTier() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
@ -426,9 +428,10 @@ public class LoadRuleTest
@Test
public void testDropWithNonExistentTier() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);