diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 802ad3227c5..f1571b13106 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -326,6 +326,9 @@ Bug Fixes the number of replicas. This avoids making too many cascading calls to remote servers, which, if not restricted, can bring down nodes containing the said collection (Kesharee Nandan Vishwakarma, Ishan Chattopadhyaya) +* SOLR-13834: ZkController#getSolrCloudManager() created a new instance of ZkStateReader, thereby causing mismatch in the + visibility of the cluster state and, as a result, undesired race conditions (Clay Goddard via Ishan Chattopadhyaya) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 4f6218f1239..ebba806f35c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -59,6 +58,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder; import org.apache.solr.client.solrj.impl.SolrClientCloudManager; +import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider; import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.cloud.overseer.OverseerAction; @@ -746,7 +746,7 @@ public class ZkController implements Closeable { if (cloudManager != null) { return cloudManager; } - cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkServerAddress), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000) + cloudSolrClient = new CloudSolrClient.Builder(new ZkClientClusterStateProvider(zkStateReader)).withSocketTimeout(30000).withConnectionTimeout(15000) .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient()) .withConnectionTimeout(15000).withSocketTimeout(30000).build(); cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cloudSolrClient); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java index 08d27e580de..bb1081d5650 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java @@ -77,10 +77,13 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0"); } - ZkStateReader zkStateReader = ocmh.zkStateReader; + //ZkStateReader zkStateReader = ocmh.zkStateReader; ocmh.overseer.offerStateUpdate(Utils.toJSON(message)); // wait for a while until we see the shard - ocmh.waitForNewShard(collectionName, sliceName); + //ocmh.waitForNewShard(collectionName, sliceName); + // wait for a while until we see the shard and update the local view of the cluster state + clusterState = ocmh.waitForNewShard(collectionName, sliceName); + String async = message.getStr(ASYNC); ZkNodeProps addReplicasProps = new ZkNodeProps( COLLECTION_PROP, collectionName, @@ -97,7 +100,8 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd { if (async != null) addReplicasProps.getProperties().put(ASYNC, async); final NamedList addResult = new NamedList(); try { - ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> { + //ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> { + ocmh.addReplica(clusterState, addReplicasProps, addResult, () -> { Object addResultFailure = addResult.get("failure"); if (addResultFailure != null) { SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure"); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index 64b0ef9712a..54b7f5b5d6f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -553,12 +553,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName"); } - void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException { + ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException { log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName); RTimer timer = new RTimer(); int retryCount = 320; while (retryCount-- > 0) { - DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection collection = clusterState.getCollection(collectionName); + if (collection == null) { throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to find collection: " + collectionName + " in clusterstate"); @@ -567,7 +569,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, if (slice != null) { log.debug("Waited for {}ms for slice {} of collection {} to be available", timer.getTime(), sliceName, collectionName); - return; + return clusterState; } Thread.sleep(1000); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java index da098af2773..32329fff2f5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java @@ -310,11 +310,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap))); - // wait until we are able to see the new shard in cluster state - ocmh.waitForNewShard(collectionName, subSlice); - - // refresh cluster state - clusterState = zkStateReader.getClusterState(); + // wait until we are able to see the new shard in cluster state and refresh the local view of the cluster state + clusterState = ocmh.waitForNewShard(collectionName, subSlice); log.debug("Adding first replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName + " on " + nodeName); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index 0b087802f89..24748caba5f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -253,7 +253,14 @@ public class CloudSolrClient extends BaseCloudSolrClient { public Builder(List solrUrls) { this.solrUrls = solrUrls; } - + + /** + * Provide an already created {@link ClusterStateProvider} instance + */ + public Builder(ClusterStateProvider stateProvider) { + this.stateProvider = stateProvider; + } + /** * Provide a series of ZK hosts which will be used when configuring {@link CloudSolrClient} instances. *