Remove throttling on drop segments (#3736)

* Remove throttling on drop

* Throttle loadqueuepeon segment change requests to ZK

* Make initial delay configurable, add docs, shutdown gracefully

* Make loadqueuepeon repeat delay configurable
This commit is contained in:
Niketh Sabbineni 2017-01-20 10:02:19 -08:00 committed by Fangjin Yang
parent bb7c496d88
commit 2b8d3c102b
14 changed files with 141 additions and 157 deletions

View File

@ -33,6 +33,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)|
### Metadata Retrieval

View File

@ -776,6 +776,7 @@ public class DruidCoordinator
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
loadQueuePeon.start();
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);
loadManagementPeons.put(server.getName(), loadQueuePeon);

View File

@ -80,4 +80,10 @@ public abstract class DruidCoordinatorConfig
{
return null;
}
@Config("druid.coordinator.loadqueuepeon.repeatDelay")
public Duration getLoadQueuePeonRepeatDelay()
{
return Duration.millis(50);
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
@ -41,6 +42,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@ -154,7 +156,6 @@ public class LoadQueuePeon
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback)));
doNext();
}
public void dropSegment(
@ -184,115 +185,98 @@ public class LoadQueuePeon
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback)));
doNext();
}
private void doNext()
{
synchronized (lock) {
if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else {
private void processSegmentChangeRequest() {
if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else {
return;
}
try {
if (currentlyProcessing == null) {
if(!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
}
actionCompleted();
return;
}
processingExecutor.execute(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
// expected when the coordinator looses leadership and LoadQueuePeon is stopped.
if (currentlyProcessing == null) {
if(!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
actionCompleted();
doNext();
return;
}
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
}
}
}
).forPath(path);
if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
entryRemoved(path);
catch (Exception e) {
failAssign(e);
}
}
catch (Exception e) {
failAssign(e);
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
}
}
}
}
}
);
} else {
log.info(
"Server[%s] skipping doNext() because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentIdentifier()
);
).forPath(path);
if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
entryRemoved(path);
}
} catch (Exception e) {
failAssign(e);
}
} else {
log.info(
"Server[%s] skipping doNext() because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentIdentifier()
);
}
}
@ -326,6 +310,29 @@ public class LoadQueuePeon
}
}
public void start()
{
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
config.getLoadQueuePeonRepeatDelay(),
config.getLoadQueuePeonRepeatDelay(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
processSegmentChangeRequest();
if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
public void stop()
{
synchronized (lock) {
@ -371,8 +378,6 @@ public class LoadQueuePeon
actionCompleted();
log.info("Server[%s] done processing [%s]", basePath, path);
}
doNext();
}
private void failAssign(Exception e)
@ -382,7 +387,6 @@ public class LoadQueuePeon
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted();
doNext();
}
}

View File

@ -28,16 +28,14 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
* The ReplicationThrottler is used to throttle the number of replicants that are created.
*/
public class ReplicationThrottler
{
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);
private final Map<String, Boolean> replicatingLookup = Maps.newHashMap();
private final Map<String, Boolean> terminatingLookup = Maps.newHashMap();
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();
private volatile int maxReplicants;
private volatile int maxLifetime;
@ -58,11 +56,6 @@ public class ReplicationThrottler
update(tier, currentlyReplicating, replicatingLookup, "create");
}
public void updateTerminationState(String tier)
{
update(tier, currentlyTerminating, terminatingLookup, "terminate");
}
private void update(String tier, ReplicatorSegmentHolder holder, Map<String, Boolean> lookup, String type)
{
int size = holder.getNumProcessing(tier);
@ -95,11 +88,6 @@ public class ReplicationThrottler
return replicatingLookup.get(tier) && !currentlyReplicating.isAtMaxReplicants(tier);
}
public boolean canDestroyReplicant(String tier)
{
return terminatingLookup.get(tier) && !currentlyTerminating.isAtMaxReplicants(tier);
}
public void registerReplicantCreation(String tier, String segmentId, String serverId)
{
currentlyReplicating.addSegment(tier, segmentId, serverId);
@ -110,16 +98,6 @@ public class ReplicationThrottler
currentlyReplicating.removeSegment(tier, segmentId, serverId);
}
public void registerReplicantTermination(String tier, String segmentId, String serverId)
{
currentlyTerminating.addSegment(tier, segmentId, serverId);
}
public void unregisterReplicantTermination(String tier, String segmentId, String serverId)
{
currentlyTerminating.removeSegment(tier, segmentId, serverId);
}
private class ReplicatorSegmentHolder
{
private final Map<String, ConcurrentHashMap<String, String>> currentlyProcessingSegments = Maps.newHashMap();

View File

@ -118,7 +118,6 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier);
replicatorThrottler.updateTerminationState(tier);
}
DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExistingWithoutAvailableSegments()

View File

@ -203,33 +203,9 @@ public abstract class LoadRule implements Rule
}
if (holder.isServingSegment(segment)) {
if (expectedNumReplicantsForTier > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(tier)) {
serverQueue.add(holder);
break;
}
replicationManager.registerReplicantTermination(
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
}
holder.getPeon().dropSegment(
segment,
new LoadPeonCallback()
{
@Override
public void execute()
{
replicationManager.unregisterReplicantTermination(
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
}
}
null
);
--loadedNumReplicantsForTier;
stats.addToTieredStat(droppedCount, tier, 1);

View File

@ -51,6 +51,7 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(0, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
Assert.assertNull(config.getConsoleStatic());
Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay());
//with non-defaults
Properties props = new Properties();
@ -65,6 +66,7 @@ public class DruidCoordinatorConfigTest
props.setProperty("druid.coordinator.kill.maxSegments", "10000");
props.setProperty("druid.coordinator.load.timeout", "PT1s");
props.setProperty("druid.coordinator.console.static", "test");
props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", "PT0.100s");
factory = Config.createFactory(props);
config = factory.build(DruidCoordinatorConfig.class);
@ -80,5 +82,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
Assert.assertEquals("test", config.getConsoleStatic());
Assert.assertEquals(Duration.millis(100), config.getLoadQueuePeonRepeatDelay());
}
}

View File

@ -1268,7 +1268,8 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
// There is no throttling on drop
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 25);
EasyMock.verify(mockPeon);
exec.shutdown();
}

View File

@ -130,7 +130,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
10,
null,
false,
false
false,
new Duration("PT0s")
);
pathChildrenCache = new PathChildrenCache(curator, LOADPATH, true, true, Execs.singleThreaded("coordinator_test_path_children_cache-%d"));
loadQueuePeon = new LoadQueuePeon(
@ -141,6 +142,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
Execs.singleThreaded("coordinator_test_load_queue_peon-%d"),
druidCoordinatorConfig
);
loadQueuePeon.start();
druidNode = new DruidNode("hey", "what", 1234);
loadManagementPeons = new MapMaker().makeMap();
scheduledExecutorFactory = new ScheduledExecutorFactory()
@ -197,6 +199,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
@After
public void tearDown() throws Exception
{
loadQueuePeon.stop();
pathChildrenCache.close();
tearDownServerAndCurator();
}

View File

@ -88,9 +88,11 @@ public class LoadQueuePeonTest extends CuratorTestBase
jsonMapper,
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false)
new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO)
);
loadQueuePeon.start();
final CountDownLatch[] loadRequestSignal = new CountDownLatch[5];
final CountDownLatch[] dropRequestSignal = new CountDownLatch[5];
final CountDownLatch[] segmentLoadedSignal = new CountDownLatch[5];
@ -294,9 +296,11 @@ public class LoadQueuePeonTest extends CuratorTestBase
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
// set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
new TestDruidCoordinatorConfig(null, null, null, new Duration(1), null, null, 10, null, false, false)
new TestDruidCoordinatorConfig(null, null, null, new Duration(1), null, null, 10, null, false, false, new Duration("PT1s"))
);
loadQueuePeon.start();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{

View File

@ -30,6 +30,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private final Duration loadTimeoutDelay;
private final Duration coordinatorKillPeriod;
private final Duration coordinatorKillDurationToRetain;
private final Duration getLoadQueuePeonRepeatDelay;
private final int coordinatorKillMaxSegments;
private final String consoleStatic;
@ -47,7 +48,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
int coordinatorKillMaxSegments,
String consoleStatic,
boolean mergeSegments,
boolean convertSegments
boolean convertSegments,
Duration getLoadQueuePeonRepeatDelay
)
{
this.coordinatorStartDelay = coordinatorStartDelay;
@ -60,6 +62,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
this.consoleStatic = consoleStatic;
this.mergeSegments = mergeSegments;
this.convertSegments = convertSegments;
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
}
@Override
@ -121,4 +124,9 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
{
return consoleStatic;
}
@Override public Duration getLoadQueuePeonRepeatDelay()
{
return getLoadQueuePeonRepeatDelay;
}
}

View File

@ -111,7 +111,8 @@ public class DruidCoordinatorSegmentKillerTest
1000,
null,
false,
false
false,
Duration.ZERO
)
);

View File

@ -89,7 +89,6 @@ public class LoadRuleTest
throttler = new ReplicationThrottler(2, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
throttler.updateTerminationState(tier);
}
segment = new DataSegment(
"foo",