SOLR-14253 Replace sleep calls with ZK waits (#1297)

Co-Authored-By: markrmiller <markrmiller@apache.org>
This commit is contained in:
Mike Drob 2021-02-01 13:25:17 -06:00 committed by GitHub
parent e8bc758144
commit 99748384cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 148 additions and 216 deletions

View File

@ -59,6 +59,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.AlreadyClosedException;
@ -1695,60 +1696,39 @@ public class ZkController implements Closeable {
}
private void waitForCoreNodeName(CoreDescriptor descriptor) {
int retryCount = 320;
log.debug("look for our core node name");
while (retryCount-- > 0) {
final DocCollection docCollection = zkStateReader.getClusterState()
.getCollectionOrNull(descriptor.getCloudDescriptor().getCollectionName());
if (docCollection != null && docCollection.getSlicesMap() != null) {
final Map<String, Slice> slicesMap = docCollection.getSlicesMap();
for (Slice slice : slicesMap.values()) {
for (Replica replica : slice.getReplicas()) {
// TODO: for really large clusters, we could 'index' on this
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String msgNodeName = getNodeName();
String msgCore = descriptor.getName();
if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
descriptor.getCloudDescriptor()
.setCoreNodeName(replica.getName());
getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor);
return;
}
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.debug("waitForCoreNodeName >>> look for our core node name");
try {
zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS, c -> {
String name = ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName());
if (name == null) return false;
descriptor.getCloudDescriptor().setCoreNodeName(name);
return true;
});
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for collection state", e);
}
getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor);
}
private void waitForShardId(CoreDescriptor cd) {
private void waitForShardId(final CoreDescriptor cd) {
if (log.isDebugEnabled()) {
log.debug("waiting to find shard id in clusterstate for {}", cd.getName());
}
int retryCount = 320;
while (retryCount-- > 0) {
final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName());
if (shardId != null) {
cd.getCloudDescriptor().setShardId(shardId);
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, c -> {
if (c == null) return false;
final String shardId = c.getShardId(getNodeName(), cd.getName());
if (shardId != null) {
cd.getCloudDescriptor().setShardId(shardId);
return true;
}
return false;
});
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed getting shard id for core: " + cd.getName(), e);
}
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not get shard id for core: " + cd.getName());
}

View File

@ -219,7 +219,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
params.set(CoreAdminParams.CORE_NODE_NAME,
ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName());
ocmh.waitToSeeReplicasInState(collectionName, Collections.singleton(createReplica.coreName)).get(createReplica.coreName).getName());
String configName = zkStateReader.readConfigName(collectionName);
String routeKey = message.getStr(ShardParams._ROUTE_);

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
@ -52,6 +53,7 @@ import org.apache.solr.cloud.OverseerNodePrioritizer;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
@ -85,8 +87,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -487,59 +487,34 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
int retryCount = 320;
while (retryCount-- > 0) {
final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
if (docCollection != null && docCollection.getSlicesMap() != null) {
Map<String,Slice> slicesMap = docCollection.getSlicesMap();
for (Slice slice : slicesMap.values()) {
for (Replica replica : slice.getReplicas()) {
// TODO: for really large clusters, we could 'index' on this
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
return replica.getName();
}
}
AtomicReference<String> coreNodeName = new AtomicReference<>();
try {
zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> {
String name = ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore);
if (name == null) {
return false;
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
coreNodeName.set(name);
return true;
});
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for coreNodeName", e);
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
return coreNodeName.get();
}
ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
RTimer timer = new RTimer();
int retryCount = 320;
while (retryCount-- > 0) {
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
if (collection == null) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Unable to find collection: " + collectionName + " in clusterstate");
}
Slice slice = collection.getSlice(sliceName);
if (slice != null) {
if (log.isDebugEnabled()) {
log.debug("Waited for {}ms for slice {} of collection {} to be available",
timer.getTime(), sliceName, collectionName);
}
return clusterState;
}
Thread.sleep(1000);
try {
zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> {
return c != null && c.getSlice(sliceName) != null;
});
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for new slice", e);
}
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not find new slice " + sliceName + " in collection " + collectionName
+ " even after waiting for " + timer.getTime() + "ms"
);
return zkStateReader.getClusterState();
}
DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
@ -592,34 +567,32 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
overseer.offerStateUpdate(Utils.toJSON(message));
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
boolean areChangesVisible = true;
while (!timeout.hasTimedOut()) {
DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
areChangesVisible = true;
for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
String updateKey = updateEntry.getKey();
try {
zkStateReader.waitForState(collectionName, 30, TimeUnit.SECONDS, c -> {
if (c == null) return false;
if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
&& !updateKey.equals(Overseer.QUEUE_OPERATION)
&& updateEntry.getValue() != null // handled below in a separate conditional
&& !updateEntry.getValue().equals(collection.get(updateKey))) {
areChangesVisible = false;
break;
for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
String updateKey = updateEntry.getKey();
if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
&& !updateKey.equals(Overseer.QUEUE_OPERATION)
&& updateEntry.getValue() != null // handled below in a separate conditional
&& !updateEntry.getValue().equals(c.get(updateKey))) {
return false;
}
if (updateEntry.getValue() == null && c.containsKey(updateKey)) {
return false;
}
}
if (updateEntry.getValue() == null && collection.containsKey(updateKey)) {
areChangesVisible = false;
break;
}
}
if (areChangesVisible) break;
timeout.sleep(100);
return true;
});
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
log.debug("modifyCollection(ClusterState={}, ZkNodeProps={}, NamedList={})", clusterState, message, results, e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to modify collection", e);
}
if (!areChangesVisible)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
// if switching to/from read-only mode reload the collection
if (message.keySet().contains(ZkStateReader.READ_ONLY)) {
reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
@ -636,33 +609,29 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
assert coreNames.size() > 0;
Map<String, Replica> result = new HashMap<>();
TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
while (true) {
DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
for (String coreName : coreNames) {
if (result.containsKey(coreName)) continue;
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
result.put(coreName, replica);
break;
}
}
}
}
Map<String, Replica> results = new HashMap<>();
AtomicReference<DocCollection> lastState = new AtomicReference<>();
if (result.size() == coreNames.size()) {
return result;
} else {
log.debug("Expecting {} cores but found {}", coreNames, result);
}
if (timeout.hasTimedOut()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);
}
long maxWait = Long.getLong("solr.waitToSeeReplicasInStateTimeoutSeconds", 120); // could be a big cluster
try {
zkStateReader.waitForState(collectionName, maxWait, TimeUnit.SECONDS, c -> {
if (c == null) return false;
Thread.sleep(100);
c.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
.filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for...
.filter(r -> !results.containsKey(r.getCoreName())) // ...but not the ones we've seen already...
.forEach(r -> results.put(r.getCoreName(), r)); // ...get added to the map
lastState.set(c);
log.debug("Expecting {} cores, found {}", coreNames, results);
return results.size() == coreNames.size();
});
} catch (TimeoutException e) {
String error = "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + lastState.get();
throw new SolrException(ErrorCode.SERVER_ERROR, error);
}
return results;
}
List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrServerException;
@ -66,51 +67,24 @@ public class LeaderElectionIntegrationTest extends SolrCloudTestCase {
String collection = "collection1";
createCollection(collection);
cluster.waitForActiveCollection(collection, 10, TimeUnit.SECONDS, 2, 6);
List<JettySolrRunner> stoppedRunners = new ArrayList<>();
for (int i = 0; i < 4; i++) {
// who is the leader?
String leader = getLeader(collection);
JettySolrRunner jetty = getRunner(leader);
assertNotNull(jetty);
assertTrue("shard1".equals(jetty.getCoreContainer().getCores().iterator().next()
.getCoreDescriptor().getCloudDescriptor().getShardId()));
assertEquals("shard1", jetty.getCoreContainer().getCores().iterator().next()
.getCoreDescriptor().getCloudDescriptor().getShardId());
jetty.stop();
stoppedRunners.add(jetty);
// poll until leader change is visible
for (int j = 0; j < 90; j++) {
String currentLeader = getLeader(collection);
if(!leader.equals(currentLeader)) {
break;
}
Thread.sleep(500);
}
leader = getLeader(collection);
int retry = 0;
while (jetty == getRunner(leader)) {
if (retry++ == 60) {
break;
}
Thread.sleep(1000);
}
if (jetty == getRunner(leader)) {
cluster.getZkClient().printLayoutToStream(System.out);
fail("We didn't find a new leader! " + jetty + " was close, but it's still showing as the leader");
}
assertTrue("shard1".equals(getRunner(leader).getCoreContainer().getCores().iterator().next()
.getCoreDescriptor().getCloudDescriptor().getShardId()));
}
for (JettySolrRunner runner : stoppedRunners) {
runner.start();
}
waitForState("Expected to see nodes come back " + collection, collection,
(n, c) -> {
return n.size() == 6;
});
(n, c) -> n.size() == 6);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
// testLeaderElectionAfterClientTimeout

View File

@ -31,6 +31,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.http.client.HttpClient;
import org.apache.solr.SolrTestCaseJ4;
@ -303,6 +304,11 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
when(zkStateReaderMock.getZkClient()).thenReturn(solrZkClientMock);
when(zkStateReaderMock.getClusterState()).thenReturn(clusterStateMock);
when(zkStateReaderMock.getAliases()).thenReturn(Aliases.EMPTY);
doAnswer(invocation -> {
Predicate<DocCollection> p = invocation.getArgument(3);
p.test(clusterStateMock.getCollection(invocation.getArgument(0)));
return null;
}).when(zkStateReaderMock).waitForState(anyString(), anyLong(), any(), any(Predicate.class));
when(clusterStateMock.getCollection(anyString())).thenAnswer(invocation -> {
String key = invocation.getArgument(0);

View File

@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.LinkedHashMapWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
import org.apache.solr.handler.DumpRequestHandler;
@ -47,6 +48,7 @@ import org.apache.solr.util.RESTfulServerProvider;
import org.apache.solr.util.RestTestBase;
import org.apache.solr.util.RestTestHarness;
import org.apache.solr.util.SimplePostTool;
import org.apache.solr.util.TimeOut;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Before;
@ -613,10 +615,10 @@ public class TestSolrConfigHandler extends RestTestBase {
long maxTimeoutSeconds) throws Exception {
boolean success = false;
long startTime = System.nanoTime();
LinkedHashMapWriter m = null;
while (TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
TimeOut timeOut = new TimeOut(maxTimeoutSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (! timeOut.hasTimedOut()) {
try {
m = testServerBaseUrl == null ? getRespMap(uri, harness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrClient);
} catch (Exception e) {

View File

@ -130,6 +130,7 @@ public class MiniSolrCloudCluster {
" \n" +
"</solr>\n";
private final Object startupWait = new Object();
private volatile ZkTestServer zkServer; // non-final due to injectChaos()
private final boolean externalZkServer;
private final List<JettySolrRunner> jettys = new CopyOnWriteArrayList<>();
@ -330,46 +331,45 @@ public class MiniSolrCloudCluster {
private void waitForAllNodes(int numServers, int timeoutSeconds) throws IOException, InterruptedException, TimeoutException {
log.info("waitForAllNodes: numServers={}", numServers);
int numRunning = 0;
int numRunning;
if (timeoutSeconds == 0) {
timeoutSeconds = DEFAULT_TIMEOUT;
}
TimeOut timeout = new TimeOut(timeoutSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (true) {
if (timeout.hasTimedOut()) {
throw new IllegalStateException("giving up waiting for all jetty instances to be running. numServers=" + numServers
+ " numRunning=" + numRunning);
}
numRunning = 0;
for (JettySolrRunner jetty : getJettySolrRunners()) {
if (jetty.isRunning()) {
numRunning++;
synchronized (startupWait) {
while (numServers != (numRunning = numRunningJetty(getJettySolrRunners()))) {
if (timeout.hasTimedOut()) {
throw new IllegalStateException("giving up waiting for all jetty instances to be running. numServers=" + numServers
+ " numRunning=" + numRunning);
}
startupWait.wait(500);
}
if (numServers == numRunning) {
break;
}
Thread.sleep(100);
}
ZkStateReader reader = getSolrClient().getZkStateReader();
for (JettySolrRunner jetty : getJettySolrRunners()) {
reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> n.contains(jetty.getNodeName()));
for (JettySolrRunner runner : getJettySolrRunners()) {
waitForNode(runner, (int) timeout.timeLeft(TimeUnit.SECONDS));
}
}
public void waitForNode(JettySolrRunner jetty, int timeoutSeconds)
throws IOException, InterruptedException, TimeoutException {
if (log.isInfoEnabled()) {
log.info("waitForNode: {}", jetty.getNodeName());
private int numRunningJetty(List<JettySolrRunner> runners) {
int numRunning = 0;
for (JettySolrRunner jsr : runners) {
if (jsr.isRunning()) numRunning++;
}
return numRunning;
}
public void waitForNode(JettySolrRunner jetty, int timeoutSeconds) throws InterruptedException, TimeoutException {
String nodeName = jetty.getNodeName();
if (nodeName == null) {
throw new IllegalArgumentException("Cannot wait for Jetty with null node name");
}
log.info("waitForNode: {}", nodeName);
ZkStateReader reader = getSolrClient().getZkStateReader();
reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> n.contains(jetty.getNodeName()));
reader.waitForLiveNodes(timeoutSeconds, TimeUnit.SECONDS, (o, n) -> n != null && n.contains(nodeName));
}
/**
@ -483,6 +483,9 @@ public class MiniSolrCloudCluster {
: new JettySolrRunnerWithMetrics(runnerPath.toString(), nodeProps, newConfig);
jetty.start();
jettys.add(jetty);
synchronized (startupWait) {
startupWait.notifyAll();
}
return jetty;
}
@ -809,22 +812,20 @@ public class MiniSolrCloudCluster {
}
public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {
if (log.isInfoEnabled()) {
log.info("waitForJettyToStop: {}", runner.getLocalPort());
String nodeName = runner.getNodeName();
if (nodeName == null) {
log.info("Cannot wait for Jetty with null node name");
return;
}
TimeOut timeout = new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while(!timeout.hasTimedOut()) {
if (runner.isStopped()) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
if (timeout.hasTimedOut()) {
throw new TimeoutException("Waiting for Jetty to stop timed out");
log.info("waitForJettyToStop: {}", nodeName);
ZkStateReader reader = getSolrClient().getZkStateReader();
try {
reader.waitForLiveNodes(15, TimeUnit.SECONDS, (o, n) -> ! n.contains(nodeName));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, "interrupted", e);
}
}