mirror of https://github.com/apache/lucene.git
SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect
This commit is contained in:
parent
77fc1885c7
commit
9419366a8e
|
@ -132,6 +132,8 @@ New Features
|
|||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
* SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect (Scott Blum, Joshua Humphries, Noble Paul)
|
||||
|
||||
* SOLR-10602: Triggers should be able to restore state from old instances when taking over. (shalin)
|
||||
|
||||
* SOLR-10714: OverseerTriggerThread does not start triggers on overseer start until autoscaling
|
||||
|
|
|
@ -67,6 +67,7 @@ public class Overseer implements Closeable {
|
|||
|
||||
public static final int STATE_UPDATE_DELAY = 2000; // delay between cloud state updates
|
||||
public static final int STATE_UPDATE_BATCH_SIZE = 10000;
|
||||
public static final int STATE_UPDATE_MAX_QUEUE = 20000;
|
||||
|
||||
public static final int NUM_RESPONSES_TO_STORE = 10000;
|
||||
public static final String OVERSEER_ELECT = "/overseer_elect";
|
||||
|
@ -626,7 +627,7 @@ public class Overseer implements Closeable {
|
|||
*/
|
||||
static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
|
||||
createOverseerNode(zkClient);
|
||||
return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats);
|
||||
return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -30,9 +31,9 @@ import java.util.function.Predicate;
|
|||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.solr.common.cloud.DistributedQueue;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.DistributedQueue;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.zookeeper.CreateMode;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -93,11 +95,22 @@ public class ZkDistributedQueue implements DistributedQueue {
|
|||
|
||||
private int watcherCount = 0;
|
||||
|
||||
private final int maxQueueSize;
|
||||
|
||||
/**
|
||||
* If {@link #maxQueueSize} is set, the number of items we can queue without rechecking the server.
|
||||
*/
|
||||
private final AtomicInteger offerPermits = new AtomicInteger(0);
|
||||
|
||||
public ZkDistributedQueue(SolrZkClient zookeeper, String dir) {
|
||||
this(zookeeper, dir, new Overseer.Stats());
|
||||
}
|
||||
|
||||
public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
|
||||
this(zookeeper, dir, stats, 0);
|
||||
}
|
||||
|
||||
public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats, int maxQueueSize) {
|
||||
this.dir = dir;
|
||||
|
||||
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
|
||||
|
@ -112,6 +125,7 @@ public class ZkDistributedQueue implements DistributedQueue {
|
|||
|
||||
this.zookeeper = zookeeper;
|
||||
this.stats = stats;
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -244,6 +258,24 @@ public class ZkDistributedQueue implements DistributedQueue {
|
|||
try {
|
||||
while (true) {
|
||||
try {
|
||||
if (maxQueueSize > 0) {
|
||||
if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) {
|
||||
// If a max queue size is set, check it before creating a new queue item.
|
||||
Stat stat = zookeeper.exists(dir, null, true);
|
||||
if (stat == null) {
|
||||
// jump to the code below, which tries to create dir if it doesn't exist
|
||||
throw new KeeperException.NoNodeException();
|
||||
}
|
||||
int remainingCapacity = maxQueueSize - stat.getNumChildren();
|
||||
if (remainingCapacity <= 0) {
|
||||
throw new IllegalStateException("queue is full");
|
||||
}
|
||||
|
||||
// Allow this client to push up to 1% of the remaining queue capacity without rechecking.
|
||||
offerPermits.set(maxQueueSize - stat.getNumChildren() / 100);
|
||||
}
|
||||
}
|
||||
|
||||
// Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
|
||||
// This will get set again when the watcher actually fires, but that's ok.
|
||||
zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.solr.handler.component.ShardHandler;
|
|||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -260,6 +261,15 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
|||
return zkMap.containsKey(key);
|
||||
});
|
||||
|
||||
when(solrZkClientMock.exists(any(String.class), isNull(), anyBoolean())).thenAnswer(invocation -> {
|
||||
String key = invocation.getArgument(0);
|
||||
if (zkMap.containsKey(key)) {
|
||||
return new Stat();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
zkMap.put("/configs/myconfig", null);
|
||||
|
||||
return liveNodes;
|
||||
|
|
Loading…
Reference in New Issue