From 9419366a8e68a599efa2d8a230a0158d1763d10d Mon Sep 17 00:00:00 2001 From: Scott Blum Date: Mon, 2 Oct 2017 16:50:57 -0400 Subject: [PATCH] SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect --- solr/CHANGES.txt | 2 ++ .../java/org/apache/solr/cloud/Overseer.java | 3 +- .../apache/solr/cloud/ZkDistributedQueue.java | 34 ++++++++++++++++++- ...rseerCollectionConfigSetProcessorTest.java | 12 ++++++- 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 4996a131702..f43bee07a6f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -132,6 +132,8 @@ New Features Bug Fixes ---------------------- +* SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect (Scott Blum, Joshua Humphries, Noble Paul) + * SOLR-10602: Triggers should be able to restore state from old instances when taking over. (shalin) * SOLR-10714: OverseerTriggerThread does not start triggers on overseer start until autoscaling diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index e70226ead83..1dafa7d332c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -67,6 +67,7 @@ public class Overseer implements Closeable { public static final int STATE_UPDATE_DELAY = 2000; // delay between cloud state updates public static final int STATE_UPDATE_BATCH_SIZE = 10000; + public static final int STATE_UPDATE_MAX_QUEUE = 20000; public static final int NUM_RESPONSES_TO_STORE = 10000; public static final String OVERSEER_ELECT = "/overseer_elect"; @@ -626,7 +627,7 @@ public class Overseer implements Closeable { */ static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats); + return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE); } /** diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java index 5b4472aaf94..6a1b8a055c3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -30,9 +31,9 @@ import java.util.function.Predicate; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.DistributedQueue; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.solr.common.util.Pair; @@ -40,6 +41,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,11 +95,22 @@ public class ZkDistributedQueue implements DistributedQueue { private int watcherCount = 0; + private final int maxQueueSize; + + /** + * If {@link #maxQueueSize} is set, the number of items we can queue without rechecking the server. + */ + private final AtomicInteger offerPermits = new AtomicInteger(0); + public ZkDistributedQueue(SolrZkClient zookeeper, String dir) { this(zookeeper, dir, new Overseer.Stats()); } public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) { + this(zookeeper, dir, stats, 0); + } + + public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats, int maxQueueSize) { this.dir = dir; ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout()); @@ -112,6 +125,7 @@ public class ZkDistributedQueue implements DistributedQueue { this.zookeeper = zookeeper; this.stats = stats; + this.maxQueueSize = maxQueueSize; } /** @@ -244,6 +258,24 @@ public class ZkDistributedQueue implements DistributedQueue { try { while (true) { try { + if (maxQueueSize > 0) { + if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) { + // If a max queue size is set, check it before creating a new queue item. + Stat stat = zookeeper.exists(dir, null, true); + if (stat == null) { + // jump to the code below, which tries to create dir if it doesn't exist + throw new KeeperException.NoNodeException(); + } + int remainingCapacity = maxQueueSize - stat.getNumChildren(); + if (remainingCapacity <= 0) { + throw new IllegalStateException("queue is full"); + } + + // Allow this client to push up to 1% of the remaining queue capacity without rechecking. + offerPermits.set(maxQueueSize - stat.getNumChildren() / 100); + } + } + // Explicitly set isDirty here so that synchronous same-thread calls behave as expected. // This will get set again when the watcher actually fires, but that's ok. zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true); diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java index 217359e9a1a..6694fc17154 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java @@ -45,6 +45,7 @@ import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.util.TimeOut; +import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -259,7 +260,16 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 { String key = invocation.getArgument(0); return zkMap.containsKey(key); }); - + + when(solrZkClientMock.exists(any(String.class), isNull(), anyBoolean())).thenAnswer(invocation -> { + String key = invocation.getArgument(0); + if (zkMap.containsKey(key)) { + return new Stat(); + } else { + return null; + } + }); + zkMap.put("/configs/myconfig", null); return liveNodes;