The complexity of removing a timeout listener was `O(n)` which means that in case of many queued up CS update tasks (such as in the case of an avalanche of dynamic mapping updates) we're dealing with quadratic complexity for timing out N tasks which was observed to be an issue in practice. This PR makes the complexity of timing out a task `O(1)` and generally simplifies the iteration logic of listeners and applies to be a little more efficient and inline better.
This commit is contained in:
parent
6b2af30a62
commit
c81a076f5a
|
@ -42,22 +42,18 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -67,7 +63,6 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
||||
|
||||
|
@ -93,17 +88,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
private final Collection<ClusterStateApplier> highPriorityStateAppliers = new CopyOnWriteArrayList<>();
|
||||
private final Collection<ClusterStateApplier> normalPriorityStateAppliers = new CopyOnWriteArrayList<>();
|
||||
private final Collection<ClusterStateApplier> lowPriorityStateAppliers = new CopyOnWriteArrayList<>();
|
||||
private final Iterable<ClusterStateApplier> clusterStateAppliers = Iterables.concat(highPriorityStateAppliers,
|
||||
normalPriorityStateAppliers, lowPriorityStateAppliers);
|
||||
|
||||
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
|
||||
private final Collection<TimeoutClusterStateListener> timeoutClusterStateListeners =
|
||||
Collections.newSetFromMap(new ConcurrentHashMap<>());
|
||||
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
|
||||
|
||||
private final LocalNodeMasterListeners localNodeMasterListeners;
|
||||
|
||||
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
|
||||
|
||||
private final AtomicReference<ClusterState> state; // last applied state
|
||||
|
||||
private final String nodeName;
|
||||
|
@ -180,18 +170,15 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
|
||||
@Override
|
||||
protected synchronized void doStop() {
|
||||
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
|
||||
onGoingTimeout.cancel();
|
||||
for (Map.Entry<TimeoutClusterStateListener, NotifyTimeout> onGoingTimeout : timeoutClusterStateListeners.entrySet()) {
|
||||
try {
|
||||
onGoingTimeout.cancel();
|
||||
onGoingTimeout.listener.onClose();
|
||||
onGoingTimeout.getValue().cancel();
|
||||
onGoingTimeout.getKey().onClose();
|
||||
} catch (Exception ex) {
|
||||
logger.debug("failed to notify listeners on shutdown", ex);
|
||||
}
|
||||
}
|
||||
ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS);
|
||||
// close timeout listeners that did not have an ongoing timeout
|
||||
timeoutClusterStateListeners.forEach(TimeoutClusterStateListener::onClose);
|
||||
removeListener(localNodeMasterListeners);
|
||||
}
|
||||
|
||||
|
@ -258,13 +245,9 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
* Removes a timeout listener for updated cluster states.
|
||||
*/
|
||||
public void removeTimeoutListener(TimeoutClusterStateListener listener) {
|
||||
timeoutClusterStateListeners.remove(listener);
|
||||
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext(); ) {
|
||||
NotifyTimeout timeout = it.next();
|
||||
if (timeout.listener.equals(listener)) {
|
||||
timeout.cancel();
|
||||
it.remove();
|
||||
}
|
||||
final NotifyTimeout timeout = timeoutClusterStateListeners.remove(listener);
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,12 +274,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
|
||||
@Override
|
||||
public void run() {
|
||||
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
|
||||
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
|
||||
assert previous == null : "Added same listener [" + listener + "]";
|
||||
if (timeout != null) {
|
||||
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
|
||||
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, ThreadPool.Names.GENERIC);
|
||||
onGoingTimeouts.add(notifyTimeout);
|
||||
}
|
||||
timeoutClusterStateListeners.add(listener);
|
||||
listener.postAdded();
|
||||
}
|
||||
});
|
||||
|
@ -516,16 +499,29 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
}
|
||||
|
||||
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
|
||||
clusterStateAppliers.forEach(applier -> {
|
||||
callClusterStateAppliers(clusterChangedEvent, stopWatch, highPriorityStateAppliers);
|
||||
callClusterStateAppliers(clusterChangedEvent, stopWatch, normalPriorityStateAppliers);
|
||||
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
|
||||
}
|
||||
|
||||
private static void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch,
|
||||
Collection<ClusterStateApplier> clusterStateAppliers) {
|
||||
for (ClusterStateApplier applier : clusterStateAppliers) {
|
||||
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
|
||||
try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
|
||||
applier.applyClusterState(clusterChangedEvent);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
|
||||
Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
|
||||
callClusterStateListener(clusterChangedEvent, stopWatch, clusterStateListeners);
|
||||
callClusterStateListener(clusterChangedEvent, stopWatch, timeoutClusterStateListeners.keySet());
|
||||
}
|
||||
|
||||
private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch,
|
||||
Collection<? extends ClusterStateListener> listeners) {
|
||||
for (ClusterStateListener listener : listeners) {
|
||||
try {
|
||||
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
|
||||
try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
|
||||
|
@ -534,7 +530,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
} catch (Exception ex) {
|
||||
logger.warn("failed to notify ClusterStateListener", ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static class SafeClusterApplyListener implements ClusterApplyListener {
|
||||
|
@ -578,12 +574,13 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
}
|
||||
}
|
||||
|
||||
class NotifyTimeout implements Runnable {
|
||||
private class NotifyTimeout implements Runnable {
|
||||
final TimeoutClusterStateListener listener;
|
||||
@Nullable
|
||||
final TimeValue timeout;
|
||||
volatile Scheduler.Cancellable cancellable;
|
||||
|
||||
NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) {
|
||||
NotifyTimeout(TimeoutClusterStateListener listener, @Nullable TimeValue timeout) {
|
||||
this.listener = listener;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
@ -596,6 +593,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
assert timeout != null : "This should only ever execute if there's an actual timeout set";
|
||||
if (cancellable != null && cancellable.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue