SOLR-14253 Avoid writes in ZKSR.waitForState (#2297)

This commit is contained in:
Mike Drob 2021-02-03 14:40:07 -06:00 committed by GitHub
parent d693a61185
commit 40c5d6b750
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 39 deletions

View File

@ -1698,12 +1698,11 @@ public class ZkController implements Closeable {
private void waitForCoreNodeName(CoreDescriptor descriptor) {
log.debug("waitForCoreNodeName >>> look for our core node name");
try {
zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS, c -> {
String name = ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName());
if (name == null) return false;
descriptor.getCloudDescriptor().setCoreNodeName(name);
return true;
});
DocCollection collection = zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS,
c -> ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName()) != null);
// Read outside of the predicate to avoid multiple potential writes
String name = ClusterStateMutator.getAssignedCoreNodeName(collection, getNodeName(), descriptor.getName());
descriptor.getCloudDescriptor().setCoreNodeName(name);
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for collection state", e);
@ -1716,15 +1715,10 @@ public class ZkController implements Closeable {
log.debug("waiting to find shard id in clusterstate for {}", cd.getName());
}
try {
zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, c -> {
if (c == null) return false;
final String shardId = c.getShardId(getNodeName(), cd.getName());
if (shardId != null) {
cd.getCloudDescriptor().setShardId(shardId);
return true;
}
return false;
});
DocCollection collection = zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS,
c -> c != null && c.getShardId(getNodeName(), cd.getName()) != null);
// Read outside of the predicate to avoid multiple potential writes
cd.getCloudDescriptor().setShardId(collection.getShardId(getNodeName(), cd.getName()));
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed getting shard id for core: " + cd.getName(), e);
@ -1814,10 +1808,8 @@ public class ZkController implements Closeable {
}
AtomicReference<String> errorMessage = new AtomicReference<>();
AtomicReference<DocCollection> collectionState = new AtomicReference<>();
try {
zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
collectionState.set(c);
if (c == null)
return false;
Slice slice = c.getSlice(cloudDesc.getShardId());

View File

@ -153,7 +153,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
// wait for a while until we don't see the collection
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, Objects::isNull);
// we can delete any remaining unique aliases
if (!aliasReferences.isEmpty()) {

View File

@ -27,11 +27,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
@ -487,21 +487,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
AtomicReference<String> coreNodeName = new AtomicReference<>();
try {
zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> {
String name = ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore);
if (name == null) {
return false;
}
coreNodeName.set(name);
return true;
});
DocCollection collection = zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c ->
ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore) != null
);
return ClusterStateMutator.getAssignedCoreNodeName(collection, msgNodeName, msgCore);
} catch (TimeoutException | InterruptedException e) {
SolrZkClient.checkInterrupted(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for coreNodeName", e);
}
return coreNodeName.get();
}
ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
@ -609,26 +603,23 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
assert coreNames.size() > 0;
Map<String, Replica> results = new HashMap<>();
AtomicReference<DocCollection> lastState = new AtomicReference<>();
Map<String, Replica> results = new ConcurrentHashMap<>();
long maxWait = Long.getLong("solr.waitToSeeReplicasInStateTimeoutSeconds", 120); // could be a big cluster
try {
zkStateReader.waitForState(collectionName, maxWait, TimeUnit.SECONDS, c -> {
if (c == null) return false;
// We write into a ConcurrentHashMap, which will be ok if called multiple times by multiple threads
c.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
.filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for...
.filter(r -> !results.containsKey(r.getCoreName())) // ...but not the ones we've seen already...
.forEach(r -> results.put(r.getCoreName(), r)); // ...get added to the map
.filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for...
.forEach(r -> results.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
lastState.set(c);
log.debug("Expecting {} cores, found {}", coreNames, results);
return results.size() == coreNames.size();
});
} catch (TimeoutException e) {
String error = "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + lastState.get();
throw new SolrException(ErrorCode.SERVER_ERROR, error);
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
}
return results;

View File

@ -1698,16 +1698,18 @@ public class ZkStateReader implements SolrCloseable {
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* The predicate may also be called concurrently when multiple state changes are seen in rapid succession.
* </p>
*
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
* @return the state of the doc collection after the predicate succeeds
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
public DocCollection waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
throws InterruptedException, TimeoutException {
if (log.isDebugEnabled()) {
log.debug("Waiting up to {}ms for state {}", unit.toMillis(wait), predicate);
@ -1733,7 +1735,7 @@ public class ZkStateReader implements SolrCloseable {
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
return docCollection.get();
} finally {
removeDocCollectionWatcher(collection, watcher);
waitLatches.remove(latch);