mirror of https://github.com/apache/lucene.git
SOLR-13834: ZkController#getSolrCloudManager() now uses the same ZkStateReader instance instead of instantiating a new one
ZkController#getSolrCloudManager() created a new instance of ZkStateReader, thereby causing mismatch in the visibility of the cluster state and, as a result, undesired race conditions.
This commit is contained in:
parent
b8648c60e7
commit
e2b160b865
|
@ -326,6 +326,9 @@ Bug Fixes
|
|||
the number of replicas. This avoids making too many cascading calls to remote servers, which, if not restricted, can
|
||||
bring down nodes containing the said collection (Kesharee Nandan Vishwakarma, Ishan Chattopadhyaya)
|
||||
|
||||
* SOLR-13834: ZkController#getSolrCloudManager() created a new instance of ZkStateReader, thereby causing mismatch in the
|
||||
visibility of the cluster state and, as a result, undesired race conditions (Clay Goddard via Ishan Chattopadhyaya)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -40,7 +40,6 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -59,6 +58,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
|
||||
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
|
||||
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
|
@ -746,7 +746,7 @@ public class ZkController implements Closeable {
|
|||
if (cloudManager != null) {
|
||||
return cloudManager;
|
||||
}
|
||||
cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkServerAddress), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000)
|
||||
cloudSolrClient = new CloudSolrClient.Builder(new ZkClientClusterStateProvider(zkStateReader)).withSocketTimeout(30000).withConnectionTimeout(15000)
|
||||
.withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
|
||||
.withConnectionTimeout(15000).withSocketTimeout(30000).build();
|
||||
cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cloudSolrClient);
|
||||
|
|
|
@ -77,10 +77,13 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
|
||||
}
|
||||
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
//ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
|
||||
// wait for a while until we see the shard
|
||||
ocmh.waitForNewShard(collectionName, sliceName);
|
||||
//ocmh.waitForNewShard(collectionName, sliceName);
|
||||
// wait for a while until we see the shard and update the local view of the cluster state
|
||||
clusterState = ocmh.waitForNewShard(collectionName, sliceName);
|
||||
|
||||
String async = message.getStr(ASYNC);
|
||||
ZkNodeProps addReplicasProps = new ZkNodeProps(
|
||||
COLLECTION_PROP, collectionName,
|
||||
|
@ -97,7 +100,8 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
|
||||
final NamedList addResult = new NamedList();
|
||||
try {
|
||||
ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
|
||||
//ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
|
||||
ocmh.addReplica(clusterState, addReplicasProps, addResult, () -> {
|
||||
Object addResultFailure = addResult.get("failure");
|
||||
if (addResultFailure != null) {
|
||||
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
|
||||
|
|
|
@ -553,12 +553,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
|||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
|
||||
}
|
||||
|
||||
void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
|
||||
ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
|
||||
log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
|
||||
RTimer timer = new RTimer();
|
||||
int retryCount = 320;
|
||||
while (retryCount-- > 0) {
|
||||
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
|
||||
if (collection == null) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Unable to find collection: " + collectionName + " in clusterstate");
|
||||
|
@ -567,7 +569,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
|||
if (slice != null) {
|
||||
log.debug("Waited for {}ms for slice {} of collection {} to be available",
|
||||
timer.getTime(), sliceName, collectionName);
|
||||
return;
|
||||
return clusterState;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
|
|
@ -310,11 +310,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
|
||||
ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
|
||||
|
||||
// wait until we are able to see the new shard in cluster state
|
||||
ocmh.waitForNewShard(collectionName, subSlice);
|
||||
|
||||
// refresh cluster state
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
// wait until we are able to see the new shard in cluster state and refresh the local view of the cluster state
|
||||
clusterState = ocmh.waitForNewShard(collectionName, subSlice);
|
||||
|
||||
log.debug("Adding first replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
|
||||
+ " on " + nodeName);
|
||||
|
|
|
@ -254,6 +254,13 @@ public class CloudSolrClient extends BaseCloudSolrClient {
|
|||
this.solrUrls = solrUrls;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide an already created {@link ClusterStateProvider} instance
|
||||
*/
|
||||
public Builder(ClusterStateProvider stateProvider) {
|
||||
this.stateProvider = stateProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a series of ZK hosts which will be used when configuring {@link CloudSolrClient} instances.
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue