Improve parallelism of zookeeper based segment change processing (#7088)

* V1 - improve parallelism of zookeeper based segment change processing

* Create zk nodes in batches. Address code review comments.
Introduce various configs.

* Add documentation for the newly added configs

* Fix test failures

* Fix more test failures

* Remove prinstacktrace statements

* Address code review comments

* Use a single queue

* Address code review comments

Since we have a separate load peon for every historical, just having a single SegmentChangeProcessor
task per historical is enough. This commit also gets rid of the associated config druid.coordinator.loadqueuepeon.curator.numCreateThreads

* Resolve merge conflict

* Fix compilation failure

* Remove batching since we already have a dynamic config maxSegmentsInNodeLoadingQueue that provides that control

* Fix NPE in test

* Remove documentation for configs that are no longer needed

* Address code review comments

* Address more code review comments

* Fix checkstyle issue

* Address code review comments

* Code review comments

* Add back monitor node remove executor

* Cleanup code to isolate null checks  and minor refactoring

* Change param name since it conflicts with member variable name
This commit is contained in:
Samarth Jain 2019-05-03 06:58:42 -07:00 committed by Roman Leventov
parent a013350018
commit afbcb9c07f
14 changed files with 431 additions and 387 deletions

View File

@ -1254,8 +1254,8 @@ These Historical configurations can be defined in the `historical/runtime.proper
|`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)|
|`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from from deep storage.|10|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores|
|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2|
In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.JvmUtils;
import org.hibernate.validator.constraints.NotEmpty;
import java.io.File;
@ -46,7 +47,7 @@ public class SegmentLoaderConfig
private int announceIntervalMillis = 0; // do not background announce
@JsonProperty("numLoadingThreads")
private int numLoadingThreads = 10;
private int numLoadingThreads = JvmUtils.getRuntimeInfo().getAvailableProcessors();
@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;

View File

@ -32,9 +32,11 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
/**
* Use {@link org.apache.druid.server.coordinator.HttpLoadQueuePeon} for segment load/drops.
@ -54,6 +56,7 @@ public class ZkCoordinator
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
private final ExecutorService segmentLoadUnloadService;
@Inject
public ZkCoordinator(
@ -61,7 +64,8 @@ public class ZkCoordinator
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
CuratorFramework curator
CuratorFramework curator,
SegmentLoaderConfig config
)
{
this.dataSegmentChangeHandler = loadDropHandler;
@ -69,6 +73,10 @@ public class ZkCoordinator
this.zkPaths = zkPaths;
this.me = me;
this.curator = curator;
this.segmentLoadUnloadService = Execs.multiThreaded(
config.getNumLoadingThreads(),
"ZKCoordinator--%d"
);
}
@LifecycleStart
@ -102,63 +110,12 @@ public class ZkCoordinator
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest request = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
log.info("New request[%s] with zNode[%s].", request.asString(), path);
try {
request.go(
dataSegmentChangeHandler,
new DataSegmentChangeCallback()
{
boolean hasRun = false;
@Override
public void execute()
{
try {
if (!hasRun) {
curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", request.asString());
hasRun = true;
}
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw new RuntimeException(e);
}
}
}
);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", request)
.emit();
}
childAdded(child);
break;
case CHILD_REMOVED:
log.info("zNode[%s] was removed", event.getData().getPath());
@ -168,6 +125,7 @@ public class ZkCoordinator
}
}
}
);
loadQueueCache.start();
}
@ -180,6 +138,59 @@ public class ZkCoordinator
}
}
private void childAdded(ChildData child)
{
segmentLoadUnloadService.submit(() -> {
final String path = child.getPath();
DataSegmentChangeRequest request = new SegmentChangeRequestNoop();
try {
final DataSegmentChangeRequest finalRequest = jsonMapper.readValue(
child.getData(),
DataSegmentChangeRequest.class
);
finalRequest.go(
dataSegmentChangeHandler,
new DataSegmentChangeCallback()
{
@Override
public void execute()
{
try {
curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", finalRequest.asString());
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw new RuntimeException(e);
}
}
}
);
}
catch (Exception e) {
// Something went wrong in either deserializing the request using jsonMapper or when invoking it
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", request)
.emit();
}
});
}
@LifecycleStop
public void stop()
{

View File

@ -21,36 +21,46 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.SegmentChangeRequestNoop;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Use {@link HttpLoadQueuePeon} instead.
* <p>
* Objects of this class can be accessed by multiple threads. State wise, this class
* is thread safe and callers of the public methods can expect thread safe behavior.
* Though, like a typical object being accessed by multiple threads,
* callers shouldn't expect strict consistency in results between two calls
* of the same or different methods.
*/
@Deprecated
public class CuratorLoadQueuePeon extends LoadQueuePeon
@ -59,40 +69,48 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
private static final int DROP = 0;
private static final int LOAD = 1;
private static void executeCallbacks(List<LoadPeonCallback> callbacks)
{
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
}
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService processingExecutor;
/**
* Threadpool with daemon threads that execute callback actions associated
* with loading or dropping segments.
*/
private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
/**
* Needs to be thread safe since it can be concurrently accessed via
* {@link #loadSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)},
* {@link #getSegmentsToLoad()} and {@link #stop()}
*/
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
/**
* Needs to be thread safe since it can be concurrently accessed via
* {@link #dropSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)},
* {@link #getSegmentsToDrop()} and {@link #stop()}
*/
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
/**
* Needs to be thread safe since it can be concurrently accessed via
* {@link #markSegmentToDrop(DataSegment)}}, {@link #unmarkSegmentToDrop(DataSegment)}}
* and {@link #getSegmentsToDrop()}
*/
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final Object lock = new Object();
private volatile SegmentHolder currentlyProcessing = null;
private boolean stopped = false;
CuratorLoadQueuePeon(
CuratorFramework curator,
String basePath,
@ -150,61 +168,30 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
}
@Override
public void loadSegment(final DataSegment segment, final LoadPeonCallback callback)
public void loadSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
{
synchronized (lock) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentId().equals(segment.getId())) {
if (callback != null) {
currentlyProcessing.addCallback(callback);
}
return;
}
SegmentHolder segmentHolder = new SegmentHolder(segment, LOAD, Collections.singletonList(callback));
final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder);
if (existingHolder != null) {
existingHolder.addCallback(callback);
return;
}
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToLoad.get(segment);
if (existingHolder != null) {
if ((callback != null)) {
existingHolder.addCallback(callback);
}
return;
}
}
log.debug("Asking server peon[%s] to load segment[%s]", basePath, segment.getId());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback)));
processingExecutor.submit(new SegmentChangeProcessor(segmentHolder));
}
@Override
public void dropSegment(
final DataSegment segment,
final LoadPeonCallback callback
)
public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
{
synchronized (lock) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentId().equals(segment.getId())) {
if (callback != null) {
currentlyProcessing.addCallback(callback);
}
return;
}
SegmentHolder segmentHolder = new SegmentHolder(segment, DROP, Collections.singletonList(callback));
final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder);
if (existingHolder != null) {
existingHolder.addCallback(callback);
return;
}
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToDrop.get(segment);
if (existingHolder != null) {
if (callback != null) {
existingHolder.addCallback(callback);
}
return;
}
}
log.debug("Asking server peon[%s] to drop segment[%s]", basePath, segment.getId());
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback)));
processingExecutor.submit(new SegmentChangeProcessor(segmentHolder));
}
@Override
@ -219,206 +206,198 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
segmentsMarkedToDrop.remove(dataSegment);
}
private void processSegmentChangeRequest()
private class SegmentChangeProcessor implements Runnable
{
if (currentlyProcessing != null) {
log.debug(
"Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentId()
);
private final SegmentHolder segmentHolder;
return;
private SegmentChangeProcessor(SegmentHolder segmentHolder)
{
this.segmentHolder = segmentHolder;
}
if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.debug("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentId());
} else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.debug("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentId());
} else {
return;
@Override
public void run()
{
try {
final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
log.debug(
"ZKNode created for server to [%s] %s [%s]",
basePath,
segmentHolder.getType() == LOAD ? "load" : "drop",
segmentHolder.getSegmentIdentifier()
);
final ScheduledFuture<?> nodeDeletedCheck = scheduleNodeDeletedCheck(path);
final Stat stat = curator.checkExists().usingWatcher(
(CuratorWatcher) watchedEvent -> {
switch (watchedEvent.getType()) {
case NodeDeleted:
// Cancel the check node deleted task since we have already
// been notified by the zk watcher
nodeDeletedCheck.cancel(true);
entryRemoved(segmentHolder, watchedEvent.getPath());
break;
default:
// do nothing
}
}
).forPath(path);
if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
entryRemoved(segmentHolder, path);
}
}
catch (KeeperException.NodeExistsException ne) {
// This is expected when historicals haven't yet picked up processing this segment and coordinator
// tries reassigning it to the same node.
log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed");
failAssign(segmentHolder);
}
catch (Exception e) {
failAssign(segmentHolder, e);
}
}
try {
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentId().toString());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
processingExecutor.schedule(
@Nonnull
private ScheduledFuture<?> scheduleNodeDeletedCheck(String path)
{
return processingExecutor.schedule(
() -> {
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
failAssign(segmentHolder, new ISE("%s was never removed! Failing this operation!", path));
} else {
log.debug("%s detected to be removed. ", path);
}
}
catch (Exception e) {
failAssign(e);
log.error(e, "Exception caught and ignored when checking whether zk node was deleted");
failAssign(segmentHolder, e);
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
(CuratorWatcher) watchedEvent -> {
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
break;
default:
// do nothing
}
}
).forPath(path);
if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
entryRemoved(path);
}
}
catch (Exception e) {
failAssign(e);
}
}
private void actionCompleted()
private void actionCompleted(SegmentHolder segmentHolder)
{
if (currentlyProcessing != null) {
switch (currentlyProcessing.getType()) {
case LOAD:
segmentsToLoad.remove(currentlyProcessing.getSegment());
queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
break;
case DROP:
segmentsToDrop.remove(currentlyProcessing.getSegment());
break;
default:
throw new UnsupportedOperationException();
}
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
() -> executeCallbacks(callbacks)
);
switch (segmentHolder.getType()) {
case LOAD:
segmentsToLoad.remove(segmentHolder.getSegment());
queuedSize.addAndGet(-segmentHolder.getSegmentSize());
break;
case DROP:
segmentsToDrop.remove(segmentHolder.getSegment());
break;
default:
throw new UnsupportedOperationException();
}
executeCallbacks(segmentHolder);
}
@Override
public void start()
{
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
config.getLoadQueuePeonRepeatDelay(),
config.getLoadQueuePeonRepeatDelay(),
() -> {
processSegmentChangeRequest();
if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
);
}
{ }
@Override
public void stop()
{
synchronized (lock) {
if (currentlyProcessing != null) {
executeCallbacks(currentlyProcessing.getCallbacks());
currentlyProcessing = null;
}
if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop.values()) {
executeCallbacks(holder.getCallbacks());
}
}
segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad.values()) {
executeCallbacks(holder.getCallbacks());
}
}
segmentsToLoad.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
stopped = true;
for (SegmentHolder holder : segmentsToDrop.values()) {
executeCallbacks(holder);
}
segmentsToDrop.clear();
for (SegmentHolder holder : segmentsToLoad.values()) {
executeCallbacks(holder);
}
segmentsToLoad.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
processingExecutor.shutdown();
callBackExecutor.shutdown();
}
private void entryRemoved(String path)
private void entryRemoved(SegmentHolder segmentHolder, String path)
{
synchronized (lock) {
if (currentlyProcessing == null) {
log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path);
return;
}
if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentId().toString())) {
log.warn(
"Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
basePath, path, currentlyProcessing
);
return;
}
log.debug(
"Server[%s] done processing %s of segment [%s]",
basePath,
currentlyProcessing.getType() == LOAD ? "load" : "drop",
path
if (!ZKPaths.getNodeFromPath(path).equals(segmentHolder.getSegmentIdentifier())) {
log.warn(
"Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
basePath, path, segmentHolder
);
actionCompleted();
return;
}
actionCompleted(segmentHolder);
log.debug(
"Server[%s] done processing %s of segment [%s]",
basePath,
segmentHolder.getType() == LOAD ? "load" : "drop",
path
);
}
private void failAssign(Exception e)
private void failAssign(SegmentHolder segmentHolder)
{
synchronized (lock) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing);
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted();
}
failAssign(segmentHolder, null);
}
private void failAssign(SegmentHolder segmentHolder, Exception e)
{
if (e != null) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder);
}
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted(segmentHolder);
}
private static class SegmentHolder
{
private final DataSegment segment;
private final DataSegmentChangeRequest changeRequest;
private final int type;
// Guaranteed to store only non-null elements
private final List<LoadPeonCallback> callbacks = new ArrayList<>();
private SegmentHolder(DataSegment segment, int type, Collection<LoadPeonCallback> callbacks)
private SegmentHolder(
DataSegment segment,
int type,
Collection<LoadPeonCallback> callbacksParam
)
{
this.segment = segment;
this.type = type;
this.changeRequest = (type == LOAD)
? new SegmentChangeRequestLoad(segment)
: new SegmentChangeRequestDrop(segment);
this.callbacks.addAll(callbacks);
Iterator<LoadPeonCallback> itr = callbacksParam.iterator();
while (itr.hasNext()) {
LoadPeonCallback c = itr.next();
if (c != null) {
callbacks.add(c);
}
}
}
public DataSegment getSegment()
@ -431,9 +410,9 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
return type;
}
public SegmentId getSegmentId()
public String getSegmentIdentifier()
{
return segment.getId();
return segment.getId().toString();
}
public long getSegmentSize()
@ -441,24 +420,20 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
return segment.getSize();
}
public void addCallbacks(Collection<LoadPeonCallback> newCallbacks)
public void addCallback(@Nullable LoadPeonCallback newCallback)
{
synchronized (callbacks) {
callbacks.addAll(newCallbacks);
if (newCallback != null) {
synchronized (callbacks) {
callbacks.add(newCallback);
}
}
}
public void addCallback(LoadPeonCallback newCallback)
List<LoadPeonCallback> snapshotCallbacks()
{
synchronized (callbacks) {
callbacks.add(newCallback);
}
}
public List<LoadPeonCallback> getCallbacks()
{
synchronized (callbacks) {
return callbacks;
// Return an immutable copy so that callers don't have to worry about concurrent modification
return ImmutableList.copyOf(callbacks);
}
}
@ -473,4 +448,11 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
return changeRequest.toString();
}
}
private void executeCallbacks(SegmentHolder holder)
{
for (LoadPeonCallback callback : holder.snapshotCallbacks()) {
callBackExecutor.submit(() -> callback.execute());
}
}
}

View File

@ -75,6 +75,12 @@ public abstract class DruidCoordinatorConfig
return "curator";
}
@Config("druid.coordinator.curator.loadqueuepeon.numCallbackThreads")
public int getNumCuratorCallBackThreads()
{
return 2;
}
@Config("druid.coordinator.loadqueuepeon.http.repeatDelay")
public Duration getHttpLoadQueuePeonRepeatDelay()
{

View File

@ -133,7 +133,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
jsonMapper,
zkPaths,
me,
curator
curator,
new SegmentLoaderConfig()
);
zkCoordinator.start();

View File

@ -162,7 +162,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
null,
false,
false,
new Duration("PT0s")
new Duration("PT0s"),
Duration.millis(10)
);
sourceLoadQueueChildrenCache = new PathChildrenCache(
curator,

View File

@ -139,7 +139,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
null,
false,
false,
new Duration("PT0s")
new Duration("PT0s"),
Duration.millis(10)
);
pathChildrenCache = new PathChildrenCache(
curator,

View File

@ -84,7 +84,8 @@ public class HttpLoadQueuePeonTest
null,
false,
false,
Duration.ZERO
Duration.ZERO,
Duration.millis(10)
)
{
@Override

View File

@ -39,6 +39,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Duration;
import org.junit.After;
@ -47,8 +48,10 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
public class LoadQueuePeonTest extends CuratorTestBase
{
@ -79,46 +82,34 @@ public class LoadQueuePeonTest extends CuratorTestBase
@Test
public void testMultipleLoadDropSegments() throws Exception
{
final AtomicInteger requestSignalIdx = new AtomicInteger(0);
final AtomicInteger segmentSignalIdx = new AtomicInteger(0);
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOAD_QUEUE_PATH,
jsonMapper,
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO)
new TestDruidCoordinatorConfig(
null,
null,
null,
null,
null,
null,
10,
null,
false,
false,
Duration.millis(0),
Duration.millis(10)
)
);
loadQueuePeon.start();
final CountDownLatch[] loadRequestSignal = new CountDownLatch[5];
final CountDownLatch[] dropRequestSignal = new CountDownLatch[5];
final CountDownLatch[] segmentLoadedSignal = new CountDownLatch[5];
final CountDownLatch[] segmentDroppedSignal = new CountDownLatch[5];
for (int i = 0; i < 5; ++i) {
loadRequestSignal[i] = new CountDownLatch(1);
dropRequestSignal[i] = new CountDownLatch(1);
segmentLoadedSignal[i] = new CountDownLatch(1);
segmentDroppedSignal[i] = new CountDownLatch(1);
}
final DataSegmentChangeHandler handler = new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
loadRequestSignal[requestSignalIdx.get()].countDown();
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
dropRequestSignal[requestSignalIdx.get()].countDown();
}
};
ConcurrentMap<SegmentId, CountDownLatch> loadRequestSignals = new ConcurrentHashMap<>(5);
ConcurrentMap<SegmentId, CountDownLatch> dropRequestSignals = new ConcurrentHashMap<>(5);
ConcurrentMap<SegmentId, CountDownLatch> segmentLoadedSignals = new ConcurrentHashMap<>(5);
ConcurrentMap<SegmentId, CountDownLatch> segmentDroppedSignals = new ConcurrentHashMap<>(5);
final List<DataSegment> segmentToDrop = Lists.transform(
ImmutableList.of(
@ -132,11 +123,24 @@ public class LoadQueuePeonTest extends CuratorTestBase
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
DataSegment dataSegment = dataSegmentWithInterval(intervalStr);
return dataSegment;
}
}
);
final CountDownLatch[] dropRequestLatches = new CountDownLatch[5];
final CountDownLatch[] dropSegmentLatches = new CountDownLatch[5];
for (int i = 0; i < 5; i++) {
dropRequestLatches[i] = new CountDownLatch(1);
dropSegmentLatches[i] = new CountDownLatch(1);
}
int i = 0;
for (DataSegment s : segmentToDrop) {
dropRequestSignals.put(s.getId(), dropRequestLatches[i]);
segmentDroppedSignals.put(s.getId(), dropSegmentLatches[i++]);
}
final List<DataSegment> segmentToLoad = Lists.transform(
ImmutableList.of(
"2014-10-27T00:00:00Z/P1D",
@ -149,11 +153,26 @@ public class LoadQueuePeonTest extends CuratorTestBase
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
DataSegment dataSegment = dataSegmentWithInterval(intervalStr);
loadRequestSignals.put(dataSegment.getId(), new CountDownLatch(1));
segmentLoadedSignals.put(dataSegment.getId(), new CountDownLatch(1));
return dataSegment;
}
}
);
final CountDownLatch[] loadRequestLatches = new CountDownLatch[5];
final CountDownLatch[] segmentLoadedLatches = new CountDownLatch[5];
for (i = 0; i < 5; i++) {
loadRequestLatches[i] = new CountDownLatch(1);
segmentLoadedLatches[i] = new CountDownLatch(1);
}
i = 0;
for (DataSegment s : segmentToDrop) {
loadRequestSignals.put(s.getId(), loadRequestLatches[i]);
segmentLoadedSignals.put(s.getId(), segmentLoadedLatches[i++]);
}
// segment with latest interval should be loaded first
final List<DataSegment> expectedLoadOrder = Lists.transform(
ImmutableList.of(
@ -162,59 +181,48 @@ public class LoadQueuePeonTest extends CuratorTestBase
"2014-10-30T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D",
"2014-10-27T00:00:00Z/P1D"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
}
}
), intervalStr -> dataSegmentWithInterval(intervalStr)
);
final DataSegmentChangeHandler handler = new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
loadRequestSignals.get(segment.getId()).countDown();
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
dropRequestSignals.get(segment.getId()).countDown();
}
};
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
DataSegmentChangeRequest request = jsonMapper.readValue(
event.getData().getData(),
DataSegmentChangeRequest.class
);
request.go(handler, null);
}
(client, event) -> {
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
DataSegmentChangeRequest request = jsonMapper.readValue(
event.getData().getData(),
DataSegmentChangeRequest.class
);
request.go(handler, null);
}
}
);
loadQueueCache.start();
for (DataSegment segment : segmentToDrop) {
for (final DataSegment segment : segmentToDrop) {
loadQueuePeon.dropSegment(
segment,
new LoadPeonCallback()
{
@Override
public void execute()
{
segmentDroppedSignal[segmentSignalIdx.get()].countDown();
}
}
() -> segmentDroppedSignals.get(segment.getId()).countDown()
);
}
for (DataSegment segment : segmentToLoad) {
for (final DataSegment segment : segmentToLoad) {
loadQueuePeon.loadSegment(
segment,
new LoadPeonCallback()
{
@Override
public void execute()
{
segmentLoadedSignal[segmentSignalIdx.get()].countDown();
}
}
() -> segmentLoadedSignals.get(segment.getId()).countDown()
);
}
@ -224,8 +232,14 @@ public class LoadQueuePeonTest extends CuratorTestBase
for (DataSegment segment : segmentToDrop) {
String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
Assert.assertTrue(timing.forWaiting().awaitLatch(dropRequestSignal[requestSignalIdx.get()]));
Assert.assertNotNull(curator.checkExists().forPath(dropRequestPath));
Assert.assertTrue(
"Latch not counted down for " + dropRequestSignals.get(segment.getId()),
dropRequestSignals.get(segment.getId()).await(10, TimeUnit.SECONDS)
);
Assert.assertNotNull(
"Path " + dropRequestPath + " doesn't exist",
curator.checkExists().forPath(dropRequestPath)
);
Assert.assertEquals(
segment,
((SegmentChangeRequestDrop) jsonMapper.readValue(
@ -235,29 +249,14 @@ public class LoadQueuePeonTest extends CuratorTestBase
)).getSegment()
);
if (requestSignalIdx.get() == 4) {
requestSignalIdx.set(0);
} else {
requestSignalIdx.incrementAndGet();
}
// simulate completion of drop request by historical
curator.delete().guaranteed().forPath(dropRequestPath);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignal[segmentSignalIdx.get()]));
int expectedNumSegmentToDrop = 5 - segmentSignalIdx.get() - 1;
Assert.assertEquals(expectedNumSegmentToDrop, loadQueuePeon.getSegmentsToDrop().size());
if (segmentSignalIdx.get() == 4) {
segmentSignalIdx.set(0);
} else {
segmentSignalIdx.incrementAndGet();
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignals.get(segment.getId())));
}
for (DataSegment segment : expectedLoadOrder) {
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()]));
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignals.get(segment.getId())));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
Assert.assertEquals(
segment,
@ -266,16 +265,9 @@ public class LoadQueuePeonTest extends CuratorTestBase
.getSegment()
);
requestSignalIdx.incrementAndGet();
// simulate completion of load request by historical
curator.delete().guaranteed().forPath(loadRequestPath);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal[segmentSignalIdx.get()]));
int expectedNumSegmentToLoad = 5 - segmentSignalIdx.get() - 1;
Assert.assertEquals(1200 * expectedNumSegmentToLoad, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(expectedNumSegmentToLoad, loadQueuePeon.getSegmentsToLoad().size());
segmentSignalIdx.incrementAndGet();
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignals.get(segment.getId())));
}
}
@ -294,7 +286,20 @@ public class LoadQueuePeonTest extends CuratorTestBase
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
// set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
new TestDruidCoordinatorConfig(null, null, null, new Duration(1), null, null, 10, null, false, false, new Duration("PT1s"))
new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration(1),
null,
null,
10,
null,
false,
false,
new Duration("PT1s"),
Duration.millis(10)
)
);
loadQueuePeon.start();

View File

@ -19,7 +19,9 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import java.util.concurrent.ConcurrentSkipListSet;
@ -29,7 +31,27 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
public LoadQueuePeonTester()
{
super(null, null, null, null, null, null);
super(
null,
null,
null,
Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"),
null,
new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration(1),
null,
null,
10,
null,
false,
false,
new Duration("PT1s"),
Duration.millis(10)
)
);
}
@Override

View File

@ -46,7 +46,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
String consoleStatic,
boolean mergeSegments,
boolean convertSegments,
Duration getLoadQueuePeonRepeatDelay
Duration getLoadQueuePeonRepeatDelay,
Duration CuratorCreateZkNodesRepeatDelay
)
{
this.coordinatorStartDelay = coordinatorStartDelay;
@ -108,8 +109,10 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
return consoleStatic;
}
@Override public Duration getLoadQueuePeonRepeatDelay()
@Override
public Duration getLoadQueuePeonRepeatDelay()
{
return getLoadQueuePeonRepeatDelay;
}
}

View File

@ -113,7 +113,8 @@ public class DruidCoordinatorSegmentKillerTest
null,
false,
false,
Duration.ZERO
Duration.ZERO,
Duration.millis(10)
)
);

View File

@ -46,6 +46,7 @@ import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
@ -90,7 +91,7 @@ import org.eclipse.jetty.server.Server;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
/**
*/
@ -249,11 +250,19 @@ public class CliCoordinator extends ServerRunnable
ZkPathsConfig zkPaths
)
{
boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
ExecutorService callBackExec;
if (useHttpLoadQueuePeon) {
callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d");
} else {
callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon"
+ "-callbackexec--%d");
}
return new LoadQueueTaskMaster(
curator,
jsonMapper,
factory.create(1, "Master-PeonExec--%d"),
Executors.newSingleThreadExecutor(),
callBackExec,
config,
httpClient,
zkPaths