From 8505d4d416fdf707bab55bc4da9a71ddb3374274 Mon Sep 17 00:00:00 2001
From: Noble Paul
Date: Tue, 19 Jan 2021 02:59:41 +1100
Subject: [PATCH] SOLR-15052: Per-replica states for reducing overseer
bottlenecks (trunk) (#2177)
---
solr/CHANGES.txt | 4 +
.../cloud/ShardLeaderElectionContextBase.java | 18 +-
.../org/apache/solr/cloud/ZkController.java | 55 +--
.../OverseerCollectionMessageHandler.java | 7 +-
.../cloud/overseer/CollectionMutator.java | 32 +-
.../solr/cloud/overseer/NodeMutator.java | 11 +-
.../solr/cloud/overseer/ReplicaMutator.java | 17 +-
.../solr/cloud/overseer/SliceMutator.java | 46 ++-
.../solr/cloud/overseer/ZkStateWriter.java | 73 +++-
.../solr/cloud/overseer/ZkWriteCommand.java | 22 +-
.../handler/admin/CollectionsHandler.java | 14 +-
.../solr/cloud/CollectionsAPISolrJTest.java | 25 +-
.../apache/solr/cloud/ZkSolrClientTest.java | 31 ++
.../solr/handler/PingRequestHandlerTest.java | 2 +
.../apache/solr/handler/TestSQLHandler.java | 4 +-
.../solr/handler/TestStressThreadBackup.java | 12 +-
.../handler/admin/HealthCheckHandlerTest.java | 1 +
.../admin/MetricsHistoryHandlerTest.java | 3 +-
.../handler/component/SearchHandlerTest.java | 4 +
.../TestSubQueryTransformerDistrib.java | 2 +
.../solr/schema/TestManagedSchemaAPI.java | 2 +
.../facet/TestCloudJSONFacetJoinDomain.java | 1 +
.../search/facet/TestCloudJSONFacetSKG.java | 1 +
.../facet/TestCloudJSONFacetSKGEquiv.java | 1 +
.../join/CrossCollectionJoinQueryTest.java | 2 +
.../solr/search/stats/TestDistribIDF.java | 3 +-
.../TemplateUpdateProcessorTest.java | 2 +-
.../util/tracing/TestDistributedTracing.java | 1 +
.../src/collection-management.adoc | 3 +
.../solrj/cloud/DistribStateManager.java | 7 +
.../solrj/impl/SolrClientCloudManager.java | 5 +-
.../solrj/impl/ZkDistribStateManager.java | 6 +
.../solrj/request/CollectionAdminRequest.java | 8 +
.../solr/common/cloud/ClusterState.java | 73 ++++
.../solr/common/cloud/DocCollection.java | 83 ++++-
.../solr/common/cloud/PerReplicaStates.java | 312 ++++++++++++++++++
.../common/cloud/PerReplicaStatesOps.java | 305 +++++++++++++++++
.../org/apache/solr/common/cloud/Replica.java | 73 +++-
.../org/apache/solr/common/cloud/Slice.java | 22 +-
.../solr/common/cloud/SolrZkClient.java | 12 +
.../solr/common/cloud/ZkStateReader.java | 88 ++++-
.../apispec/collections.Commands.json | 5 +
.../IndexingNestedDocuments.java | 7 +-
.../JsonRequestApiHeatmapFacetingTest.java | 4 +-
.../JsonRequestApiTest.java | 3 +-
.../UsingSolrJRefGuideExamplesTest.java | 1 +
.../solrj/impl/CloudSolrClientTest.java | 72 +++-
.../solrj/io/stream/CloudAuthStreamTest.java | 8 +-
.../solrj/io/stream/JDBCStreamTest.java | 4 +-
.../solrj/io/stream/MathExpressionTest.java | 5 +-
.../io/stream/SelectWithEvaluatorsTest.java | 4 +-
.../solrj/io/stream/StreamDecoratorTest.java | 51 ++-
...onQueryRequestFacetingIntegrationTest.java | 8 +-
...onQueryRequestFacetingIntegrationTest.java | 8 +-
.../JsonQueryRequestHeatmapFacetingTest.java | 4 +-
.../cloud/TestCloudCollectionsListeners.java | 4 +
.../cloud/TestCollectionStateWatchers.java | 21 +-
.../common/cloud/TestPerReplicaStates.java | 133 ++++++++
.../apache/solr/cloud/SolrCloudTestCase.java | 1 +
59 files changed, 1572 insertions(+), 164 deletions(-)
create mode 100644 solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
create mode 100644 solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
create mode 100644 solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7c8a26ff12d..f17cc986265 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -209,6 +209,8 @@ New Features
* SOLR-14560: Add interleaving support in Learning To Rank. (Alessandro Benedetti, Christine Poerschke)
+* SOLR-15052: Reducing overseer bottlenecks using per-replica states (noble, Ishan Chattopadhyaya)
+
Improvements
---------------------
* SOLR-14942: Reduce leader election time on node shutdown by removing election nodes before closing cores.
@@ -385,6 +387,8 @@ New Features
* SOLR-14615: CPU Utilization Based Circuit Breaker (Atri Sharma)
+* SOLR-15052: Reducing overseer bottlenecks using per-replica states (noble, Ishan Chattopadhyaya)
+
Improvements
---------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index b010de24afb..531294a8390 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -19,18 +19,13 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.List;
import java.util.ArrayList;
+import java.util.List;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkMaintenanceUtils;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.*;
import org.apache.solr.common.util.RetryUtil;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
@@ -179,7 +174,14 @@ class ShardLeaderElectionContextBase extends ElectionContext {
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
assert zkController != null;
assert zkController.getOverseer() != null;
- zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+ DocCollection coll = zkStateReader.getCollection(this.collection);
+ if (coll == null || ZkController.sendToOverseer(coll, id)) {
+ zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+ } else {
+ PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+ PerReplicaStatesOps.flipLeader(zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicaNames(), id, prs)
+ .persist(coll.getZNode(), zkStateReader.getZkClient());
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5df5cee6628..0e573ad2dfc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -64,31 +64,8 @@ import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ConnectionManager;
-import org.apache.solr.common.cloud.DefaultConnectionStrategy;
-import org.apache.solr.common.cloud.DefaultZkACLProvider;
-import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocCollectionWatcher;
-import org.apache.solr.common.cloud.LiveNodesListener;
-import org.apache.solr.common.cloud.NodesSysPropsCacher;
-import org.apache.solr.common.cloud.OnReconnect;
-import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.*;
import org.apache.solr.common.cloud.Replica.Type;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.UrlScheme;
-import org.apache.solr.common.cloud.ZkACLProvider;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkCredentialsProvider;
-import org.apache.solr.common.cloud.ZkMaintenanceUtils;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
@@ -1606,12 +1583,40 @@ public class ZkController implements Closeable {
if (updateLastState) {
cd.getCloudDescriptor().setLastPublished(state);
}
- overseerJobQueue.offer(Utils.toJSON(m));
+ DocCollection coll = zkStateReader.getCollection(collection);
+ if (forcePublish || sendToOverseer(coll, coreNodeName)) {
+ overseerJobQueue.offer(Utils.toJSON(m));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
+ }
+ PerReplicaStates perReplicaStates = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+ PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
+ .persist(coll.getZNode(), zkClient);
+ }
} finally {
MDCLoggingContext.clear();
}
}
+ /**
+ * Whether a message needs to be sent to overseer or not
+ */
+ static boolean sendToOverseer(DocCollection coll, String replicaName) {
+ if (coll == null) return true;
+ if (!coll.isPerReplicaState()) return true;
+ Replica r = coll.getReplica(replicaName);
+ if (r == null) return true;
+ Slice shard = coll.getSlice(r.shard);
+ if (shard == null) return true;//very unlikely
+ if (shard.getState() == Slice.State.RECOVERY) return true;
+ if (shard.getParent() != null) return true;
+ for (Slice slice : coll.getSlices()) {
+ if (Objects.equals(shard.getName(), slice.getParent())) return true;
+ }
+ return false;
+ }
+
public ZkShardTerms getShardTerms(String collection, String shardId) {
return getCollectionTerms(collection).getShard(shardId);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 41508422f14..b69d4d43376 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -36,10 +36,10 @@ import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.BadVersionException;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -75,6 +75,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.TimeSource;
@@ -84,7 +85,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
@@ -142,6 +142,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NRT_REPLICAS, "1",
ZkStateReader.TLOG_REPLICAS, "0",
+ DocCollection.PER_REPLICA_STATE, null,
ZkStateReader.PULL_REPLICAS, "0"));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index b64ca49ac9e..073bdaf75f3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -25,13 +25,7 @@ import java.util.Map;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.*;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,10 +40,12 @@ public class CollectionMutator {
protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
+ protected final SolrZkClient zkClient;
public CollectionMutator(SolrCloudManager cloudManager) {
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
+ this.zkClient = SliceMutator.getZkClient(cloudManager);
}
public ZkWriteCommand createShard(final ClusterState clusterState, ZkNodeProps message) {
@@ -107,7 +103,21 @@ public class CollectionMutator {
DocCollection coll = clusterState.getCollection(message.getStr(COLLECTION_PROP));
Map m = coll.shallowCopy();
boolean hasAnyOps = false;
+ PerReplicaStatesOps replicaOps = null;
for (String prop : CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
+ if (prop.equals(DocCollection.PER_REPLICA_STATE)) {
+ String val = message.getStr(DocCollection.PER_REPLICA_STATE);
+ if (val == null) continue;
+ boolean enable = Boolean.parseBoolean(val);
+ if (enable == coll.isPerReplicaState()) {
+ //already enabled
+ log.error("trying to set perReplicaState to {} from {}", val, coll.isPerReplicaState());
+ continue;
+ }
+ replicaOps = PerReplicaStatesOps.modifyCollection(coll, enable, PerReplicaStates.fetch(coll.getZNode(), zkClient, null));
+ }
+
+
if (message.containsKey(prop)) {
hasAnyOps = true;
if (message.get(prop) == null) {
@@ -136,8 +146,12 @@ public class CollectionMutator {
return ZkStateWriter.NO_OP;
}
- return new ZkWriteCommand(coll.getName(),
- new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion()));
+ DocCollection collection = new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion());
+ if (replicaOps == null){
+ return new ZkWriteCommand(coll.getName(), collection);
+ } else {
+ return new ZkWriteCommand(coll.getName(), collection, replicaOps, true);
+ }
}
public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 3f1971efe05..48e0a16946c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -45,6 +46,8 @@ public class NodeMutator {
Map collections = clusterState.getCollectionsMap();
for (Map.Entry entry : collections.entrySet()) {
+ List downedReplicas = new ArrayList<>();
+
String collection = entry.getKey();
DocCollection docCollection = entry.getValue();
@@ -68,6 +71,7 @@ public class NodeMutator {
Replica.State.DOWN, replica.type, props);
newReplicas.put(replica.getName(), newReplica);
needToUpdateCollection = true;
+ downedReplicas.add(replica.getName());
}
}
@@ -76,7 +80,12 @@ public class NodeMutator {
}
if (needToUpdateCollection) {
- zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+ if (docCollection.isPerReplicaState()) {
+ zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
+ PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+ } else {
+ zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f849143142f..ecf06e4ed59 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -39,8 +39,11 @@ import org.apache.solr.cloud.api.collections.SplitShardCmd;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
@@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
+import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
import static org.apache.solr.common.params.CommonParams.NAME;
public class ReplicaMutator {
@@ -57,10 +61,12 @@ public class ReplicaMutator {
protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
+ protected SolrZkClient zkClient;
public ReplicaMutator(SolrCloudManager cloudManager) {
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
+ this.zkClient = getZkClient(cloudManager);
}
protected Replica setProperty(Replica replica, String key, String value) {
@@ -260,6 +266,7 @@ public class ReplicaMutator {
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
+ boolean persistCollectionState = collection != null && collection.isPerReplicaState();
if (coreNodeName == null) {
coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
@@ -271,6 +278,7 @@ public class ReplicaMutator {
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
+ persistCollectionState = true;
// if coreNodeName is null, auto assign one
coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
}
@@ -285,6 +293,7 @@ public class ReplicaMutator {
if (sliceName != null) {
log.debug("shard={} is already registered", sliceName);
}
+ persistCollectionState = true;
}
if (sliceName == null) {
//request new shardId
@@ -295,6 +304,7 @@ public class ReplicaMutator {
}
sliceName = Assign.assignShard(collection, numShards);
log.info("Assigning new node to shard shard={}", sliceName);
+ persistCollectionState = true;
}
Slice slice = collection != null ? collection.getSlice(sliceName) : null;
@@ -366,7 +376,12 @@ public class ReplicaMutator {
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
log.debug("Collection is now: {}", newCollection);
- return new ZkWriteCommand(collectionName, newCollection);
+ if (collection != null && collection.isPerReplicaState()) {
+ PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+ return new ZkWriteCommand(collectionName, newCollection, PerReplicaStatesOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
+ } else{
+ return new ZkWriteCommand(collectionName, newCollection);
+ }
}
private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 40ab1a359f9..019339e197c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -25,14 +25,18 @@ import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.RoutingRule;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -51,10 +55,21 @@ public class SliceMutator {
protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
+ protected final SolrZkClient zkClient;
public SliceMutator(SolrCloudManager cloudManager) {
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
+ this.zkClient = getZkClient(cloudManager);
+ }
+
+ static SolrZkClient getZkClient(SolrCloudManager cloudManager) {
+ if (cloudManager instanceof SolrClientCloudManager) {
+ SolrClientCloudManager manager = (SolrClientCloudManager) cloudManager;
+ return manager.getZkClient();
+ } else {
+ return null;
+ }
}
public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -80,7 +95,15 @@ public class SliceMutator {
ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP),
ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)), coll, slice);
- return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+
+ if (collection.isPerReplicaState()) {
+ PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+ return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica),
+ PerReplicaStatesOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
+ } else {
+ return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+ }
+
}
public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -106,7 +129,12 @@ public class SliceMutator {
newSlices.put(slice.getName(), slice);
}
- return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+
+ if (coll.isPerReplicaState()) {
+ return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices), PerReplicaStatesOps.deleteReplica(cnn, coll.getPerReplicaStates()) , true);
+ } else {
+ return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+ }
}
public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) {
@@ -124,6 +152,7 @@ public class SliceMutator {
Slice slice = slices.get(sliceName);
Replica oldLeader = slice.getLeader();
+ Replica newLeader = null;
final Map newReplicas = new LinkedHashMap<>();
for (Replica replica : slice.getReplicas()) {
// TODO: this should only be calculated once and cached somewhere?
@@ -132,7 +161,7 @@ public class SliceMutator {
if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
} else if (coreURL.equals(leaderUrl)) {
- replica = new ReplicaMutator(cloudManager).setLeader(replica);
+ newLeader = replica = new ReplicaMutator(cloudManager).setLeader(replica);
}
newReplicas.put(replica.getName(), replica);
@@ -141,7 +170,16 @@ public class SliceMutator {
Map newSliceProps = slice.shallowCopy();
newSliceProps.put(Slice.REPLICAS, newReplicas);
slice = new Slice(slice.getName(), newReplicas, slice.getProperties(), collectionName);
- return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+ if (coll.isPerReplicaState()) {
+ PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+ return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice),
+ PerReplicaStatesOps.flipLeader(
+ slice.getReplicaNames(),
+ newLeader == null ? null : newLeader.getName(),
+ prs), false);
+ } else {
+ return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+ }
}
public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 155fbc218e0..978b2096b2b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -27,6 +27,7 @@ import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
@@ -63,7 +64,7 @@ public class ZkStateWriter {
protected final ZkStateReader reader;
protected final Stats stats;
- protected Map updates = new HashMap<>();
+ protected Map updates = new HashMap<>();
private int numUpdates = 0;
protected ClusterState clusterState = null;
protected long lastUpdatedTime = 0;
@@ -111,15 +112,47 @@ public class ZkStateWriter {
if (cmds.isEmpty()) return prevState;
if (isNoOps(cmds)) return prevState;
+ boolean forceFlush = false;
+ if (cmds.size() == 1) {
+ //most messages result in only one command. let's deal with it right away
+ ZkWriteCommand cmd = cmds.get(0);
+ if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+ //we do not wish to batch any updates for collections with per-replica state because
+ // these changes go to individual ZK nodes and there is zero advantage to batching
+ //now check if there are any updates for the same collection already present
+ if (updates.containsKey(cmd.name)) {
+ //this should not happen
+ // but let's get those updates out anyway
+ writeUpdate(updates.remove(cmd.name));
+ }
+ //now let's write the current message
+ try {
+ return writeUpdate(cmd);
+ } finally {
+ if (callback !=null) callback.onWrite();
+ }
+ }
+ } else {
+ //there are more than one commands created as a result of this message
+ for (ZkWriteCommand cmd : cmds) {
+ if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+ // we don't try to optimize for this case. let's flush out all after this
+ forceFlush = true;
+ break;
+ }
+ }
+ }
+
+
for (ZkWriteCommand cmd : cmds) {
if (cmd == NO_OP) continue;
prevState = prevState.copyWith(cmd.name, cmd.collection);
- updates.put(cmd.name, cmd.collection);
+ updates.put(cmd.name, cmd);
numUpdates++;
}
clusterState = prevState;
- if (maybeFlushAfter()) {
+ if (forceFlush || maybeFlushAfter()) {
ClusterState state = writePendingUpdates();
if (callback != null) {
callback.onWrite();
@@ -149,7 +182,15 @@ public class ZkStateWriter {
public boolean hasPendingUpdates() {
return numUpdates != 0;
}
+ public ClusterState writeUpdate(ZkWriteCommand command) throws IllegalStateException, KeeperException, InterruptedException {
+ Map commands = new HashMap<>();
+ commands.put(command.name, command);
+ return writePendingUpdates(commands);
+ }
+ public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+ return writePendingUpdates(updates);
+ }
/**
* Writes all pending updates to ZooKeeper and returns the modified cluster state
*
@@ -158,20 +199,30 @@ public class ZkStateWriter {
* @throws KeeperException if any ZooKeeper operation results in an error
* @throws InterruptedException if the current thread is interrupted
*/
- public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+ public ClusterState writePendingUpdates(Map updates) throws IllegalStateException, KeeperException, InterruptedException {
if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
- if (!hasPendingUpdates()) return clusterState;
+ if ((updates == this.updates)
+ && !hasPendingUpdates()) {
+ return clusterState;
+ }
Timer.Context timerContext = stats.time("update_state");
boolean success = false;
try {
if (!updates.isEmpty()) {
- for (Map.Entry entry : updates.entrySet()) {
+ for (Map.Entry entry : updates.entrySet()) {
String name = entry.getKey();
String path = ZkStateReader.getCollectionPath(name);
- DocCollection c = entry.getValue();
+ ZkWriteCommand cmd = entry.getValue();
+ DocCollection c = cmd.collection;
+ if(cmd.ops != null && cmd.ops.isPreOp()) {
+ cmd.ops.persist(path, reader.getZkClient());
+ clusterState = clusterState.copyWith(name,
+ cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
+ }
+ if (!cmd.persistCollState) continue;
if (c == null) {
// let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
log.debug("going to delete state.json {}", path);
@@ -192,6 +243,14 @@ public class ZkStateWriter {
clusterState = clusterState.copyWith(name, newCollection);
}
}
+ if(cmd.ops != null && !cmd.ops.isPreOp()) {
+ cmd.ops.persist(path, reader.getZkClient());
+ DocCollection currentCollState = clusterState.getCollection(cmd.name);
+ if ( currentCollState != null) {
+ clusterState = clusterState.copyWith(name,
+ currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
+ }
+ }
}
updates.clear();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index d464863df4b..39d953c540f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -17,16 +17,34 @@
package org.apache.solr.cloud.overseer;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
public class ZkWriteCommand {
+
public final String name;
public final DocCollection collection;
- public final boolean noop;
+ public final boolean noop;
+ // persist the collection state. If this is false, it means the collection state is not modified
+ public final boolean persistCollState;
+ public final PerReplicaStatesOps ops;
+
+ public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistCollState) {
+ boolean isPerReplicaState = collection.isPerReplicaState();
+ this.name = name;
+ this.collection = collection;
+ this.noop = false;
+ this.ops = isPerReplicaState ? replicaOps : null;
+ this.persistCollState = isPerReplicaState ? persistCollState : true;
+ }
public ZkWriteCommand(String name, DocCollection collection) {
this.name = name;
this.collection = collection;
this.noop = false;
+ persistCollState = true;
+ this.ops = collection != null && collection.isPerReplicaState() ?
+ PerReplicaStatesOps.touchChildren():
+ null;
}
/**
@@ -36,6 +54,8 @@ public class ZkWriteCommand {
this.noop = true;
this.name = null;
this.collection = null;
+ this.ops = null;
+ persistCollState = true;
}
public static ZkWriteCommand noop() {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 399b87bf400..288f6c5d124 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -19,17 +19,7 @@ package org.apache.solr.handler.admin;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -118,6 +108,7 @@ import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHan
import static org.apache.solr.cloud.api.collections.RoutedAlias.CREATE_COLLECTION_PREFIX;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
+import static org.apache.solr.common.cloud.DocCollection.PER_REPLICA_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
@@ -462,6 +453,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
TLOG_REPLICAS,
NRT_REPLICAS,
WAIT_FOR_FINAL_STATE,
+ PER_REPLICA_STATE,
ALIAS);
if (props.get(REPLICATION_FACTOR) != null && props.get(NRT_REPLICAS) != null) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index c4fc4e4d5b3..e3a89164c50 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -16,13 +16,6 @@
*/
package org.apache.solr.cloud;
-import static java.util.Arrays.asList;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_DEF;
-import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.NUM_SHARDS_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
-import static org.apache.solr.common.params.CollectionAdminParams.DEFAULTS;
-
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
@@ -77,6 +70,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Arrays.asList;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_DEF;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.NUM_SHARDS_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.DEFAULTS;
+
@LuceneTestCase.Slow
public class CollectionsAPISolrJTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -104,6 +104,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
public void testCreateWithDefaultConfigSet() throws Exception {
String collectionName = "solrj_default_configset";
CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, 2, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 2, 4);
@@ -236,6 +237,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
assertEquals("2", String.valueOf(clusterProperty));
CollectionAdminResponse response = CollectionAdminRequest
.createCollection(COLL_NAME, "conf", null, null, null, null)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
@@ -406,7 +408,9 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
public void testCreateAndDeleteAlias() throws IOException, SolrServerException {
final String collection = "aliasedCollection";
- CollectionAdminRequest.createCollection(collection, "conf", 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, "conf", 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
CollectionAdminResponse response
= CollectionAdminRequest.createAlias("solrj_alias", collection).process(cluster.getSolrClient());
@@ -421,6 +425,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
final String collectionName = "solrj_test_splitshard";
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 2, 2);
@@ -475,6 +480,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
cluster.getJettySolrRunners().forEach(j -> j.getCoreContainer().getAllowPaths().add(tmpDir));
CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.withProperty(CoreAdminParams.DATA_DIR, dataDir.toString())
.withProperty(CoreAdminParams.ULOG_DIR, ulogDir.toString())
.process(cluster.getSolrClient());
@@ -501,6 +507,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
final String collectionName = "solrj_replicatests";
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 1, 2);
@@ -569,6 +576,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
final String propName = "testProperty";
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 2, 4);
@@ -600,6 +608,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
public void testColStatus() throws Exception {
final String collectionName = "collectionStatusTest";
CollectionAdminRequest.createCollection(collectionName, "conf2", 2, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 2, 4);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
index e9afc3bdd9d..3f86835128c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
@@ -18,7 +18,9 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -29,12 +31,16 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.solr.cloud.SolrCloudTestCase.configureCluster;
+
public class ZkSolrClientTest extends SolrTestCaseJ4 {
@BeforeClass
@@ -376,6 +382,31 @@ public class ZkSolrClientTest extends SolrTestCaseJ4 {
}
}
+ public void testZkBehavior() throws Exception {
+ MiniSolrCloudCluster cluster =
+ configureCluster(4)
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .configure();
+ try {
+ SolrZkClient zkClient = cluster.getZkClient();
+ zkClient.create("/test-node", null, CreateMode.PERSISTENT, true);
+
+ Stat stat = zkClient.exists("/test-node", null, true);
+ int cversion = stat.getCversion();
+ List ops = Arrays.asList(
+ Op.create("/test-node/abc", null, zkClient.getZkACLProvider().getACLsToAdd("/test-node/abc"), CreateMode.PERSISTENT),
+ Op.delete("/test-node/abc", -1));
+ zkClient.multi(ops, true);
+ stat = zkClient.exists("/test-node", null, true);
+ assertTrue(stat.getCversion() >= cversion + 2);
+ } finally {
+ cluster.shutdown();
+ }
+
+ }
+
+
+
@Override
public void tearDown() throws Exception {
super.tearDown();
diff --git a/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
index dd8aead99e1..9d196cf50ad 100644
--- a/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.SolrPing;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
@@ -188,6 +189,7 @@ public class PingRequestHandlerTest extends SolrTestCaseJ4 {
String configName = "solrCloudCollectionConfig";
miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1").resolve("conf"), configName);
CollectionAdminRequest.createCollection(collectionName, configName, NUM_SHARDS, REPLICATION_FACTOR)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(miniCluster.getSolrClient());
// Send distributed and non-distributed ping query
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 96555ca5053..5bc68532535 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -63,7 +63,9 @@ public class TestSQLHandler extends SolrCloudTestCase {
collection = COLLECTIONORALIAS;
}
- CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(collection, 2, 2);
if (useAlias) {
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java b/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
index 3622948c50f..b1cc214b130 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
@@ -24,9 +24,9 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -34,16 +34,16 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase.Nightly;
+import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.TestUtil;
-
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
@@ -51,9 +51,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.util.TimeOut;
import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -96,7 +95,8 @@ public class TestStressThreadBackup extends SolrCloudTestCase {
.configure();
assertEquals(0, (CollectionAdminRequest.createCollection(DEFAULT_TEST_COLLECTION_NAME, "conf1", 1, 1)
- .process(cluster.getSolrClient()).getStatus()));
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient()).getStatus()));
adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
initCoreNameAndSolrCoreClient();
}
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
index 1ee868b6481..25b66939f2f 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
@@ -80,6 +80,7 @@ public class HealthCheckHandlerTest extends SolrCloudTestCase {
try (HttpSolrClient httpSolrClient = getHttpSolrClient(cluster.getJettySolrRunner(0).getBaseUrl().toString())) {
CollectionAdminResponse collectionAdminResponse = CollectionAdminRequest.createCollection("test", "_default", 1, 1)
.withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(httpSolrClient);
assertEquals(0, collectionAdminResponse.getStatus());
SolrResponse response = req.process(httpSolrClient);
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
index ea536beb6ca..835c8b190cc 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
@@ -78,7 +78,8 @@ public class MetricsHistoryHandlerTest extends SolrCloudTestCase {
// create .system collection
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL,
- "conf", 1, 1);
+ "conf", 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
create.process(solrClient);
CloudUtil.waitForState(cloudManager, "failed to create " + CollectionAdminParams.SYSTEM_COLL,
CollectionAdminParams.SYSTEM_COLL, CloudUtil.clusterShape(1, 1));
diff --git a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
index f0b29732904..aa17b551c2f 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
@@ -30,6 +30,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -140,6 +141,7 @@ public class SearchHandlerTest extends SolrTestCaseJ4
miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
CollectionAdminRequest.createCollection(collectionName, configName, 2, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(miniCluster.getSolrClient());
QueryRequest req = new QueryRequest();
@@ -182,6 +184,7 @@ public class SearchHandlerTest extends SolrTestCaseJ4
miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
CollectionAdminRequest.createCollection(collectionName, configName, 2, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(miniCluster.getSolrClient());
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -229,6 +232,7 @@ public class SearchHandlerTest extends SolrTestCaseJ4
miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
CollectionAdminRequest.createCollection(collectionName, configName, 2, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(miniCluster.getSolrClient());
ModifiableSolrParams params = new ModifiableSolrParams();
diff --git a/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java b/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
index 0f9221c6af8..524dff96740 100644
--- a/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
+++ b/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
@@ -78,10 +78,12 @@ public class TestSubQueryTransformerDistrib extends SolrCloudTestCase {
int replicas = 2 ;
CollectionAdminRequest.createCollection(people, configName, shards, replicas)
.withProperty("config", "solrconfig-doctransformers.xml")
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.withProperty("schema", "schema-docValuesJoin.xml")
.process(cluster.getSolrClient());
CollectionAdminRequest.createCollection(depts, configName, shards, replicas)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.withProperty("config", "solrconfig-doctransformers.xml")
.withProperty("schema",
differentUniqueId ? "schema-minimal-with-another-uniqkey.xml":
diff --git a/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java b/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
index a2375bafd95..f022ac3b2d2 100644
--- a/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
+++ b/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
@@ -51,6 +51,8 @@ public class TestManagedSchemaAPI extends SolrCloudTestCase {
public void test() throws Exception {
String collection = "testschemaapi";
CollectionAdminRequest.createCollection(collection, "conf1", 1, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+
.process(cluster.getSolrClient());
testModifyField(collection);
testReloadAndAddSimple(collection);
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
index a88ed95cc72..c1dc54976cf 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
@@ -109,6 +109,7 @@ public class TestCloudJSONFacetJoinDomain extends SolrCloudTestCase {
collectionProperties.put("config", "solrconfig-tlog.xml");
collectionProperties.put("schema", "schema_latest.xml");
CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setProperties(collectionProperties)
.process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
index f2b67d5428d..f0c3d5613bb 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
@@ -138,6 +138,7 @@ public class TestCloudJSONFacetSKG extends SolrCloudTestCase {
collectionProperties.put("config", "solrconfig-tlog.xml");
collectionProperties.put("schema", "schema_latest.xml");
CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setProperties(collectionProperties)
.process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
index 8e0959373db..e3d808508fc 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
@@ -132,6 +132,7 @@ public class TestCloudJSONFacetSKGEquiv extends SolrCloudTestCase {
collectionProperties.put("config", "solrconfig-tlog.xml");
collectionProperties.put("schema", "schema_latest.xml");
CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setProperties(collectionProperties)
.process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
index ebdb960164a..a42ebaa35db 100644
--- a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
+++ b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
@@ -54,9 +54,11 @@ public class CrossCollectionJoinQueryTest extends SolrCloudTestCase {
CollectionAdminRequest.createCollection("products", "ccjoin", NUM_SHARDS, NUM_REPLICAS)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
CollectionAdminRequest.createCollection("parts", "ccjoin", NUM_SHARDS, NUM_REPLICAS)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
}
diff --git a/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java b/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
index 4fc1f3a7476..cf0ac7e9ae1 100644
--- a/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
+++ b/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
@@ -200,7 +201,7 @@ public class TestDistribIDF extends SolrTestCaseJ4 {
response = create.process(solrCluster.getSolrClient());
solrCluster.waitForActiveCollection(name, 3, 3);
} else {
- CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(name,config,2,1);
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(name,config,2,1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
response = create.process(solrCluster.getSolrClient());
solrCluster.waitForActiveCollection(name, 2, 2);
}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
index 6eb122a3f48..c8ae8b25641 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
@@ -88,7 +88,7 @@ public class TemplateUpdateProcessorTest extends SolrCloudTestCase {
params.add("commit", "true");
UpdateRequest add = new UpdateRequest().add(solrDoc);
add.setParams(params);
- NamedList
*/
- DOWN,
+ DOWN("D"),
/**
* The node is recovering from the leader. This might involve peer-sync,
* full replication or finding out things are already in sync.
*/
- RECOVERING,
+ RECOVERING("R"),
/**
* Recovery attempts have not worked, something is not right.
@@ -80,8 +83,16 @@ public class Replica extends ZkNodeProps implements MapWriter {
* cluster and it's state should be discarded.
*
*/
- RECOVERY_FAILED;
-
+ RECOVERY_FAILED("F");
+
+ /**short name for a state. Used to encode this in the state node see {@link PerReplicaStates.State}
+ */
+ public final String shortName;
+
+ State(String c) {
+ this.shortName = c;
+ }
+
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
@@ -124,6 +135,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
public final String core;
public final Type type;
public final String shard, collection;
+ private PerReplicaStates.State replicaState;
// mutable
private State state;
@@ -137,6 +149,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
this.node = (String) propMap.get(ZkStateReader.NODE_NAME_PROP);
this.core = (String) propMap.get(ZkStateReader.CORE_NAME_PROP);
this.type = Type.get((String) propMap.get(ZkStateReader.REPLICA_TYPE));
+ readPrs();
// default to ACTIVE
this.state = State.getState(String.valueOf(propMap.getOrDefault(ZkStateReader.STATE_PROP, State.ACTIVE.toString())));
validate();
@@ -158,6 +171,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
if (props != null) {
this.propMap.putAll(props);
}
+ readPrs();
validate();
propMap.put(BASE_URL_PROP, UrlScheme.INSTANCE.getBaseUrlForNodeName(this.node));
}
@@ -177,13 +191,26 @@ public class Replica extends ZkNodeProps implements MapWriter {
this.shard = String.valueOf(details.get("shard"));
this.core = String.valueOf(details.get("core"));
this.node = String.valueOf(details.get("node_name"));
- type = Replica.Type.valueOf(String.valueOf(details.getOrDefault(ZkStateReader.REPLICA_TYPE, "NRT")));
- state = State.getState(String.valueOf(details.getOrDefault(ZkStateReader.STATE_PROP, "active")));
+
this.propMap.putAll(details);
+ readPrs();
+ type = Replica.Type.valueOf(String.valueOf(propMap.getOrDefault(ZkStateReader.REPLICA_TYPE, "NRT")));
+ if(state == null) state = State.getState(String.valueOf(propMap.getOrDefault(ZkStateReader.STATE_PROP, "active")));
validate();
propMap.put(BASE_URL_PROP, UrlScheme.INSTANCE.getBaseUrlForNodeName(this.node));
}
+ private void readPrs() {
+ ClusterState.getReplicaStatesProvider().get().ifPresent(it -> {
+ log.debug("A replica {} state fetched from per-replica state", name);
+ replicaState = it.getStates().get(name);
+ if(replicaState!= null) {
+ propMap.put(ZkStateReader.STATE_PROP, replicaState.state.toString().toLowerCase(Locale.ROOT));
+ if (replicaState.isLeader) propMap.put(Slice.LEADER, "true");
+ }
+ }) ;
+ }
+
private final void validate() {
Objects.requireNonNull(this.name, "'name' must not be null");
Objects.requireNonNull(this.core, "'core' must not be null");
@@ -295,6 +322,24 @@ public class Replica extends ZkNodeProps implements MapWriter {
final String propertyValue = getStr(propertyKey);
return propertyValue;
}
+ public Replica copyWith(PerReplicaStates.State state) {
+ log.debug("A replica is updated with new state : {}", state);
+ Map props = new LinkedHashMap<>(propMap);
+ if (state == null) {
+ props.put(ZkStateReader.STATE_PROP, State.DOWN.toString());
+ props.remove(Slice.LEADER);
+ } else {
+ props.put(ZkStateReader.STATE_PROP, state.state.toString());
+ if (state.isLeader) props.put(Slice.LEADER, "true");
+ }
+ Replica r = new Replica(name, props, collection, shard);
+ r.replicaState = state;
+ return r;
+ }
+
+ public PerReplicaStates.State getReplicaState() {
+ return replicaState;
+ }
public Object clone() {
return new Replica(name, node, collection, shard, core, state, type, propMap);
@@ -305,6 +350,17 @@ public class Replica extends ZkNodeProps implements MapWriter {
ew.put(name, _allPropsWriter());
}
+ private static final Map STATES = new HashMap<>();
+ static {
+ STATES.put(Replica.State.ACTIVE.shortName, Replica.State.ACTIVE);
+ STATES.put(Replica.State.DOWN.shortName, Replica.State.DOWN);
+ STATES.put(Replica.State.RECOVERING.shortName, Replica.State.RECOVERING);
+ STATES.put(Replica.State.RECOVERY_FAILED.shortName, Replica.State.RECOVERY_FAILED);
+ }
+ public static State getState(String shortName) {
+ return STATES.get(shortName);
+ }
+
private MapWriter _allPropsWriter() {
BiPredicate p = dedupeKeyPredicate(new HashSet<>())
@@ -330,7 +386,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
.put(ZkStateReader.STATE_PROP, state.toString(), p);
};
}
-
@Override
public void write(JSONWriter jsonWriter) {
Map map = new LinkedHashMap<>();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 4378ef761f3..23796fbf332 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.common.cloud;
+import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -25,11 +26,14 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.solr.common.cloud.Replica.Type;
import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.common.util.Utils.toJSONString;
@@ -37,6 +41,8 @@ import static org.apache.solr.common.util.Utils.toJSONString;
* A Slice contains immutable information about a logical shard (all replicas that share the same shard id).
*/
public class Slice extends ZkNodeProps implements Iterable {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
public final String collection;
/** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */
@@ -61,6 +67,16 @@ public class Slice extends ZkNodeProps implements Iterable {
return replicas.values().iterator();
}
+ /**Make a copy with a modified replica
+ */
+ public Slice copyWith(Replica modified) {
+ if(log.isDebugEnabled()) {
+ log.debug("modified replica : {}", modified);
+ }
+ Map replicasCopy = new LinkedHashMap<>(replicas);
+ replicasCopy.put(modified.getName(), modified);
+ return new Slice(name, replicasCopy, propMap, collection);
+ }
/** The slice's state. */
public enum State {
@@ -210,7 +226,7 @@ public class Slice extends ZkNodeProps implements Iterable {
private Replica findLeader() {
for (Replica replica : replicas.values()) {
- if (replica.getStr(LEADER) != null) {
+ if (replica.isLeader()) {
assert replica.getType() == Type.TLOG || replica.getType() == Type.NRT: "Pull replica should not become leader!";
return replica;
}
@@ -235,6 +251,10 @@ public class Slice extends ZkNodeProps implements Iterable {
return replicas.values();
}
+ public Set getReplicaNames() {
+ return Collections.unmodifiableSet(replicas.keySet());
+ }
+
/**
* Gets all replicas that match a predicate
*/
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 1ac21fe5d9e..7a12c92ce08 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -330,6 +330,18 @@ public class SolrZkClient implements Closeable {
}
}
+ /**
+ * Returns children of the node at the path
+ */
+ public List getChildren(final String path, final Watcher watcher,Stat stat, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher) , stat));
+ } else {
+ return keeper.getChildren(path, wrapWatcher(watcher), stat);
+ }
+ }
+
/**
* Returns node's data
*/
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7ff69cfbe0a..16b9d72327d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -655,7 +655,7 @@ public class ZkStateReader implements SolrCloseable {
private class LazyCollectionRef extends ClusterState.CollectionRef {
private final String collName;
- private long lastUpdateTime;
+ private volatile long lastUpdateTime;
private DocCollection cachedDocCollection;
public LazyCollectionRef(String collName) {
@@ -670,12 +670,12 @@ public class ZkStateReader implements SolrCloseable {
if (!allowCached || lastUpdateTime < 0 || System.nanoTime() - lastUpdateTime > LAZY_CACHE_TIME) {
boolean shouldFetch = true;
if (cachedDocCollection != null) {
- Stat exists = null;
+ Stat freshStats = null;
try {
- exists = zkClient.exists(getCollectionPath(collName), null, true);
+ freshStats = zkClient.exists(getCollectionPath(collName), null, true);
} catch (Exception e) {
}
- if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
+ if (freshStats != null && !cachedDocCollection.isModified(freshStats.getVersion(), freshStats.getCversion())) {
shouldFetch = false;
}
}
@@ -853,14 +853,16 @@ public class ZkStateReader implements SolrCloseable {
* Get shard leader properties, with retry if none exist.
*/
public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
-
+ AtomicReference coll = new AtomicReference<>();
AtomicReference leader = new AtomicReference<>();
try {
waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
if (c == null)
return false;
+ coll.set(c);
Replica l = getLeader(n, c, shard);
if (l != null) {
+ log.debug("leader found for {}/{} to be {}", collection, shard, l);
leader.set(l);
return true;
}
@@ -1195,9 +1197,11 @@ public class ZkStateReader implements SolrCloseable {
*/
class StateWatcher implements Watcher {
private final String coll;
+ private final String collectionPath;
StateWatcher(String coll) {
this.coll = coll;
+ collectionPath = getCollectionPath(coll);
}
@Override
@@ -1219,17 +1223,28 @@ public class ZkStateReader implements SolrCloseable {
event, coll, liveNodes.size());
}
- refreshAndWatch();
+ refreshAndWatch(event.getType());
}
+ public void refreshAndWatch() {
+ refreshAndWatch(null);
+ }
/**
* Refresh collection state from ZK and leave a watch for future changes.
* As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
* with the results of the refresh.
*/
- public void refreshAndWatch() {
+ public void refreshAndWatch(EventType eventType) {
try {
+ if (eventType == null || eventType == EventType.NodeChildrenChanged) {
+ refreshAndWatchChildren();
+ if (eventType == EventType.NodeChildrenChanged) {
+ //only per-replica states modified. return
+ return;
+ }
+ }
+
DocCollection newState = fetchCollectionState(coll, this);
updateWatchedCollection(coll, newState);
synchronized (getUpdateLock()) {
@@ -1246,6 +1261,29 @@ public class ZkStateReader implements SolrCloseable {
log.error("Unwatched collection: [{}]", coll, e);
}
}
+
+ private void refreshAndWatchChildren() throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+ List replicaStates = null;
+ try {
+ replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
+ PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
+ DocCollection oldState = watchedCollectionStates.get(coll);
+ final DocCollection newState = oldState != null ?
+ oldState.copyWith(newStates) :
+ fetchCollectionState(coll, null);
+ updateWatchedCollection(coll, newState);
+ synchronized (getUpdateLock()) {
+ constructState(Collections.singleton(coll));
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("updated per-replica states changed for: {}, ver: {} , new vals: {}", coll, stat.getCversion(), replicaStates);
+ }
+
+ } catch (NoNodeException e) {
+ log.info("{} is deleted, stop watching children", collectionPath);
+ }
+ }
}
/**
@@ -1422,6 +1460,16 @@ public class ZkStateReader implements SolrCloseable {
private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
String collectionPath = getCollectionPath(coll);
while (true) {
+ ClusterState.initReplicaStateProvider(() -> {
+ try {
+ PerReplicaStates replicaStates = PerReplicaStates.fetch(collectionPath, zkClient, null);
+ log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+ return replicaStates;
+ } catch (Exception e) {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ });
try {
Stat stat = new Stat();
byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
@@ -1439,6 +1487,8 @@ public class ZkStateReader implements SolrCloseable {
}
}
return null;
+ } finally {
+ ClusterState.clearReplicaStateProvider();
}
}
}
@@ -1566,11 +1616,28 @@ public class ZkStateReader implements SolrCloseable {
}
DocCollection state = clusterState.getCollectionOrNull(collection);
+ state = updatePerReplicaState(state);
if (stateWatcher.onStateChanged(state) == true) {
removeDocCollectionWatcher(collection, stateWatcher);
}
}
+ private DocCollection updatePerReplicaState(DocCollection c) {
+ if (c == null || !c.isPerReplicaState()) return c;
+ PerReplicaStates current = c.getPerReplicaStates();
+ PerReplicaStates newPrs = PerReplicaStates.fetch(c.getZNode(), zkClient, current);
+ if (newPrs != current) {
+ if(log.isDebugEnabled()) {
+ log.debug("update for a fresh per-replica-state {}", c.getName());
+ }
+ DocCollection modifiedColl = c.copyWith(newPrs);
+ updateWatchedCollection(c.getName(), modifiedColl);
+ return modifiedColl;
+ } else {
+ return c;
+ }
+ }
+
/**
* Block until a CollectionStatePredicate returns true, or the wait times out
*
@@ -1804,7 +1871,9 @@ public class ZkStateReader implements SolrCloseable {
break;
}
} else {
- if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+ int oldCVersion = oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion;
+ int newCVersion = newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion;
+ if (oldState.getZNodeVersion() >= newState.getZNodeVersion() && oldCVersion >= newCVersion) {
// no change to state, but we might have been triggered by the addition of a
// state watcher, so run notifications
updated = true;
@@ -2199,6 +2268,7 @@ public class ZkStateReader implements SolrCloseable {
}
public DocCollection getCollection(String collection) {
- return clusterState.getCollectionOrNull(collection);
+ return clusterState == null ? null : clusterState.getCollectionOrNull(collection);
}
+
}
diff --git a/solr/solrj/src/resources/apispec/collections.Commands.json b/solr/solrj/src/resources/apispec/collections.Commands.json
index c3abc604e66..747cc8b8b28 100644
--- a/solr/solrj/src/resources/apispec/collections.Commands.json
+++ b/solr/solrj/src/resources/apispec/collections.Commands.json
@@ -90,6 +90,11 @@
"type": "boolean",
"description": "If true then request will complete only when all affected replicas become active.",
"default": false
+ },
+ "perReplicaState": {
+ "type": "boolean",
+ "description": "Use Per replica states",
+ "default": false
}
},
"required": [
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
index 9d22119c0af..c853c7ef782 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
@@ -25,14 +25,11 @@ import java.util.List;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-
import org.apache.solr.cloud.SolrCloudTestCase;
-
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.util.ExternalPaths;
-
import org.junit.After;
import org.junit.BeforeClass;
@@ -70,7 +67,9 @@ public class IndexingNestedDocuments extends SolrCloudTestCase {
*/
public void testIndexingAnonKids() throws Exception {
final String collection = "test_anon";
- CollectionAdminRequest.createCollection(collection, ANON_KIDS_CONFIG, 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, ANON_KIDS_CONFIG, 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.getSolrClient().setDefaultCollection(collection);
//
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
index 4698c7ebe32..3915a002f09 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
@@ -57,7 +57,9 @@ public class JsonRequestApiHeatmapFacetingTest extends SolrCloudTestCase {
final List solrUrls = new ArrayList<>();
solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
- CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
indexSpatialData();
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
index 83fe1c39ca4..fe0f3169d14 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
@@ -64,7 +64,8 @@ public class JsonRequestApiTest extends SolrCloudTestCase {
final List solrUrls = new ArrayList<>();
solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
- CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
index 04776ccec9c..acb6d3e69ec 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
@@ -69,6 +69,7 @@ public class UsingSolrJRefGuideExamplesTest extends SolrCloudTestCase {
.configure();
CollectionAdminResponse response = CollectionAdminRequest.createCollection("techproducts", "conf", 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection("techproducts", 1, 1);
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index d6dc6fc576a..e156607561e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.response.SolrPingResponse;
@@ -60,8 +61,10 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@@ -71,6 +74,7 @@ import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler;
+import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -80,11 +84,14 @@ import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
/**
* This test would be faster if we simulated the zk state instead.
*/
@Slow
+@LogLevel("org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
public class CloudSolrClientTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -140,7 +147,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@Test
public void testParallelUpdateQTime() throws Exception {
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 2);
UpdateRequest req = new UpdateRequest();
for (int i=0; i<10; i++) {
@@ -157,6 +166,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
public void testOverwriteOption() throws Exception {
CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection("overwrite", 1, 1);
@@ -180,10 +190,14 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@Test
public void testAliasHandling() throws Exception {
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 2);
- CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION2, 2, 2);
CloudSolrClient client = getRandomClient();
@@ -228,7 +242,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@Test
public void testRouting() throws Exception {
- CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("routing_collection", 2, 2);
AbstractUpdateRequest request = new UpdateRequest()
@@ -412,6 +428,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
// all its cores on the same node.
// Hence the below configuration for our collection
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * liveNodes);
// Add some new documents
@@ -482,7 +499,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
int liveNodes = cluster.getJettySolrRunners().size();
// For testing replica.type, we want to have all replica types available for the collection
- CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes / 3, liveNodes / 3, liveNodes / 3)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection(collectionName, 1, liveNodes);
@@ -632,8 +650,10 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
.processAsync(client);
String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
.processAsync(client);
CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
@@ -776,7 +796,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@Test
public void testVersionsAreReturned() throws Exception {
- CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("versions_collection", 2, 2);
// assert that "adds" are returned
@@ -825,7 +847,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@Test
public void testInitializationWithSolrUrls() throws Exception {
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 2);
CloudSolrClient client = httpBasedCloudSolrClient;
SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
@@ -1021,7 +1045,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@Test
public void testPing() throws Exception {
final String testCollection = "ping_test";
- CollectionAdminRequest.createCollection(testCollection, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(testCollection, "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(testCollection, 2, 2);
final SolrClient clientUnderTest = getRandomClient();
@@ -1030,4 +1056,34 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
assertEquals("This should be OK", 0, response.getStatus());
}
+ public void testPerReplicaStateCollection() throws Exception {
+ CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
+ .process(cluster.getSolrClient());
+
+ String testCollection = "perReplicaState_test";
+ int liveNodes = cluster.getJettySolrRunners().size();
+ CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
+ .setPerReplicaState(Boolean.TRUE)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(testCollection, 2, 4);
+ final SolrClient clientUnderTest = getRandomClient();
+ final SolrPingResponse response = clientUnderTest.ping(testCollection);
+ assertEquals("This should be OK", 0, response.getStatus());
+ DocCollection c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
+ PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
+ assertEquals(4, prs.states.size());
+ testCollection = "perReplicaState_testv2";
+ new V2Request.Builder("/collections")
+ .withMethod(POST)
+ .withPayload("{create: {name: perReplicaState_testv2, config : conf, numShards : 2, nrtReplicas : 2, perReplicaState : true}}")
+ .build()
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(testCollection, 2, 4);
+ c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
+ prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
+ assertEquals(4, prs.states.size());
+ }
+
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
index e8aef51421c..f8393143074 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
@@ -41,15 +41,14 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.security.BasicAuthPlugin;
import org.apache.solr.security.RuleBasedAuthorizationPlugin;
import org.apache.solr.util.TimeOut;
-
-import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
-
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
+
/**
* tests various streaming expressions (via the SolrJ {@link SolrStream} API) against a SolrCloud cluster
* using both Authenticationand Role based Authorization
@@ -126,7 +125,8 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
CollectionAdminRequest.createCollection(collection, "_default", 2, 2)
- .setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
.process(cluster.getSolrClient());
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index 8b74a66b579..a7abfadfa65 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -74,7 +74,9 @@ public class JDBCStreamTest extends SolrCloudTestCase {
} else {
collection = COLLECTIONORALIAS;
}
- CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index edef2698f76..b7eaa64a580 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -65,7 +65,10 @@ public class MathExpressionTest extends SolrCloudTestCase {
collection = COLLECTIONORALIAS;
}
- CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index add4331b6c4..2fa0dd09335 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -68,7 +68,9 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
} else {
collection = COLLECTIONORALIAS;
}
- CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 6c88ffee9c1..4d775402737 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -96,7 +96,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
collection = COLLECTIONORALIAS;
}
- CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(collection, 2, 2);
@@ -2679,7 +2680,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testUpdateStream() throws Exception {
- CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("destinationCollection", 2, 2);
new UpdateRequest()
@@ -2773,7 +2775,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testParallelUpdateStream() throws Exception {
- CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("parallelDestinationCollection", 2, 2);
new UpdateRequest()
@@ -2871,7 +2874,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testParallelDaemonUpdateStream() throws Exception {
- CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
new UpdateRequest()
@@ -3045,7 +3049,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
public void testParallelTerminatingDaemonUpdateStream() throws Exception {
Assume.assumeTrue(!useAlias);
- CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
new UpdateRequest()
@@ -3231,7 +3236,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testCommitStream() throws Exception {
- CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("destinationCollection", 2, 2);
new UpdateRequest()
@@ -3324,7 +3330,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testParallelCommitStream() throws Exception {
- CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("parallelDestinationCollection", 2, 2);
new UpdateRequest()
@@ -3422,7 +3429,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testParallelDaemonCommitStream() throws Exception {
- CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
new UpdateRequest()
@@ -3639,11 +3647,14 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
public void testClassifyStream() throws Exception {
Assume.assumeTrue(!useAlias);
- CollectionAdminRequest.createCollection("modelCollection", "ml", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("modelCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("modelCollection", 2, 2);
- CollectionAdminRequest.createCollection("uknownCollection", "ml", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("uknownCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("uknownCollection", 2, 2);
- CollectionAdminRequest.createCollection("checkpointCollection", "ml", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("checkpointCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("checkpointCollection", 2, 2);
UpdateRequest updateRequest = new UpdateRequest();
@@ -3867,11 +3878,14 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testExecutorStream() throws Exception {
- CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+ CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
cluster.waitForActiveCollection("workQueue", 2, 2);
- CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+ CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
cluster.waitForActiveCollection("mainCorpus", 2, 2);
- CollectionAdminRequest.createCollection("destination", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+ CollectionAdminRequest.createCollection("destination", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
cluster.waitForActiveCollection("destination", 2, 2);
UpdateRequest workRequest = new UpdateRequest();
@@ -3933,11 +3947,14 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
@Test
public void testParallelExecutorStream() throws Exception {
- CollectionAdminRequest.createCollection("workQueue1", "conf", 2, 1).processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
+ CollectionAdminRequest.createCollection("workQueue1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
- CollectionAdminRequest.createCollection("mainCorpus1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+ CollectionAdminRequest.createCollection("mainCorpus1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
- CollectionAdminRequest.createCollection("destination1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+ CollectionAdminRequest.createCollection("destination1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
cluster.waitForActiveCollection("workQueue1", 2, 2);
cluster.waitForActiveCollection("mainCorpus1", 2, 2);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
index 48f13c2135e..6a3497150fb 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
@@ -24,10 +24,10 @@ import java.util.List;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
-import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
-import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
+import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.util.ExternalPaths;
@@ -56,7 +56,9 @@ public class DirectJsonQueryRequestFacetingIntegrationTest extends SolrCloudTest
final List solrUrls = new ArrayList<>();
solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
- CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
index 291bcba149a..94a46c4a60b 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
@@ -29,10 +29,10 @@ import java.util.Map;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
-import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
-import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
+import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrDocumentList;
@@ -63,7 +63,9 @@ public class JsonQueryRequestFacetingIntegrationTest extends SolrCloudTestCase {
final List solrUrls = new ArrayList<>();
solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
- CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
index 1ccd581628e..977d4055e0d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
@@ -47,7 +47,9 @@ public class JsonQueryRequestHeatmapFacetingTest extends SolrCloudTestCase {
final List solrUrls = new ArrayList<>();
solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
- CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
indexSpatialData();
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
index b0de38379a7..37afd382f94 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
@@ -97,6 +97,7 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
assertFalse("CloudCollectionsListener not triggered after registration", newResults.get(2).contains("testcollection1"));
CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("testcollection1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
@@ -110,6 +111,7 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
client.getZkStateReader().removeCloudCollectionsListener(watcher1);
CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.processAndWait(client, MAX_WAIT_TIMEOUT);
cluster.waitForActiveCollection("testcollection2", 4, 4);
@@ -136,10 +138,12 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.processAndWait(client, MAX_WAIT_TIMEOUT);
cluster.waitForActiveCollection("testcollection1", 4, 4);
CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.processAndWait(client, MAX_WAIT_TIMEOUT);
cluster.waitForActiveCollection("testcollection2", 4, 4);
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 8c19f3e48c2..94eae27b05c 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -33,6 +33,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.ExecutorUtil;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +122,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
// note: one node in our cluster is unsed by collection
CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1));
@@ -169,7 +171,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@@ -199,7 +202,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
@@ -217,6 +221,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}
@Test
+ @Ignore
public void testCanWaitForNonexistantCollection() throws Exception {
Future future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
@@ -247,7 +252,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
@@ -301,7 +307,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
@Test
public void testDeletionsTriggerWatches() throws Exception {
CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1)
- .process(cluster.getSolrClient());
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> c == null);
@@ -315,7 +322,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
public void testLiveNodeChangesTriggerWatches() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
- CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client);
+ CollectionAdminRequest.createCollection("test_collection", "config", 1, 1)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .process(client);
Future future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> (l.size() == 1 + CLUSTER_SIZE));
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
new file mode 100644
index 00000000000..b6ea6f75c80
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.zookeeper.CreateMode;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestPerReplicaStates extends SolrCloudTestCase {
+ @Before
+ public void prepareCluster() throws Exception {
+ configureCluster(4)
+ .configure();
+ }
+
+ @After
+ public void tearDownCluster() throws Exception {
+ shutdownCluster();
+ }
+
+ public void testBasic() {
+ PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
+ assertEquals("R1:1:A", rs.asString);
+
+ rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
+ assertEquals("R1:1:D:L", rs.asString);
+ rs = PerReplicaStates.State.parse (rs.asString);
+ assertEquals(State.DOWN, rs.state);
+
+ }
+
+ public void testEntries() {
+ PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
+ assertEquals(2, entries.get("R1").version);
+ entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
+ assertEquals(2, entries.get("R1").version);
+ assertEquals(2, entries.get("R1").getDuplicates().size());
+ Set modified = PerReplicaStates.findModifiedReplicas(entries, new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
+ assertEquals(1, modified.size());
+ assertTrue(modified.contains("R3"));
+ modified = PerReplicaStates.findModifiedReplicas( entries,
+ new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
+ assertEquals(2, modified.size());
+ assertTrue(modified.contains("R3"));
+ assertTrue(modified.contains("R4"));
+ modified = PerReplicaStates.findModifiedReplicas( entries,
+ new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
+ assertEquals(3, modified.size());
+ assertTrue(modified.contains("R3"));
+ assertTrue(modified.contains("R4"));
+ assertTrue(modified.contains("R2"));
+
+
+ }
+
+ public void testReplicaStateOperations() throws Exception {
+ String root = "/testReplicaStateOperations";
+ cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
+
+ ImmutableList states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
+
+ for (String state : states) {
+ cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
+ }
+
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ PerReplicaStates rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(3, rs.states.size());
+ assertTrue(rs.cversion >= 5);
+
+ PerReplicaStatesOps ops = PerReplicaStatesOps.addReplica("R5",State.ACTIVE, false, rs);
+ assertEquals(1, ops.get().size());
+ assertEquals(PerReplicaStates.Operation.Type.ADD , ops.ops.get(0).typ );
+ ops.persist(root,cluster.getZkClient());
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(4, rs.states.size());
+ assertTrue(rs.cversion >= 6);
+ assertEquals(6, cluster.getZkClient().getChildren(root, null,true).size());
+ ops = PerReplicaStatesOps.flipState("R1", State.DOWN , rs);
+
+ assertEquals(4, ops.ops.size());
+ assertEquals(PerReplicaStates.Operation.Type.ADD, ops.ops.get(0).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(1).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(2).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(3).typ);
+ ops.persist(root, cluster.getZkClient());
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(4, rs.states.size());
+ assertEquals(3, rs.states.get("R1").version);
+
+ ops = PerReplicaStatesOps.deleteReplica("R5" , rs);
+ assertEquals(1, ops.ops.size());
+ ops.persist(root,cluster.getZkClient());
+
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(3, rs.states.size());
+
+ ops = PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs);
+ assertEquals(2, ops.ops.size());
+ assertEquals(PerReplicaStates.Operation.Type.ADD, ops.ops.get(0).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(1).typ);
+ ops.persist(root,cluster.getZkClient());
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ ops = PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs);
+ assertEquals(4, ops.ops.size());
+ ops.persist(root,cluster.getZkClient());
+ rs =PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertTrue(rs.get("R3").isLeader);
+ }
+
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index b646e2e82f6..c6f26c64d24 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -81,6 +81,7 @@ import org.slf4j.LoggerFactory;
public class SolrCloudTestCase extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final Boolean USE_PER_REPLICA_STATE = Boolean.parseBoolean(System.getProperty("use.per-replica", "false"));
public static final int DEFAULT_TIMEOUT = 45; // this is an important timeout for test stability - can't be too short