mirror of https://github.com/apache/lucene.git
SOLR-7389: Expose znodeVersion property for each of the collections returned for the clusterstatus operation in the collections API
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1681776 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
194d92c53c
commit
49911c619b
|
@ -79,6 +79,9 @@ New Features
|
||||||
* SOLR-6273: Cross Data Center Replication. Active/passive replication for separate
|
* SOLR-6273: Cross Data Center Replication. Active/passive replication for separate
|
||||||
SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson)
|
SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson)
|
||||||
|
|
||||||
|
* SOLR-7389: Expose znodeVersion property for each of the collections returned for the clusterstatus
|
||||||
|
operation in the collections API (Marius Grama via shalin)
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
----------------------
|
----------------------
|
||||||
(no changes)
|
(no changes)
|
||||||
|
|
|
@ -42,7 +42,6 @@ import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
@ -61,7 +60,6 @@ import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||||
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
|
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
|
||||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||||
import org.apache.solr.cloud.rule.SnitchContext;
|
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.SolrException.ErrorCode;
|
import org.apache.solr.common.SolrException.ErrorCode;
|
||||||
import org.apache.solr.common.cloud.Aliases;
|
import org.apache.solr.common.cloud.Aliases;
|
||||||
|
@ -106,8 +104,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.slf4j.MDC;
|
import org.slf4j.MDC;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
|
|
||||||
|
|
||||||
public class OverseerCollectionProcessor implements Runnable, Closeable {
|
public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||||
|
|
||||||
|
@ -818,19 +814,42 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||||
byte[] bytes = ZkStateReader.toJSON(clusterState);
|
byte[] bytes = ZkStateReader.toJSON(clusterState);
|
||||||
Map<String, Object> stateMap = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
|
Map<String, Object> stateMap = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
|
||||||
|
|
||||||
|
Set<String> collections = new HashSet<>();
|
||||||
|
String routeKey = message.getStr(ShardParams._ROUTE_);
|
||||||
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
NamedList<Object> collectionProps = new SimpleOrderedMap<Object>();
|
|
||||||
if (collection == null) {
|
if (collection == null) {
|
||||||
Set<String> collections = clusterState.getCollections();
|
collections = new HashSet<>(clusterState.getCollections());
|
||||||
|
} else {
|
||||||
|
collections = Collections.singleton(collection);
|
||||||
|
}
|
||||||
|
|
||||||
|
NamedList<Object> collectionProps = new SimpleOrderedMap<Object>();
|
||||||
|
|
||||||
for (String name : collections) {
|
for (String name : collections) {
|
||||||
Map<String, Object> collectionStatus = null;
|
Map<String, Object> collectionStatus = null;
|
||||||
if (clusterState.getCollection(name).getStateFormat() > 1) {
|
DocCollection clusterStateCollection = clusterState.getCollection(name);
|
||||||
bytes = ZkStateReader.toJSON(clusterState.getCollection(name));
|
|
||||||
Map<String, Object> docCollection = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
|
Set<String> requestedShards = new HashSet<>();
|
||||||
collectionStatus = getCollectionStatus(docCollection, name, shard);
|
if (routeKey != null) {
|
||||||
} else {
|
DocRouter router = clusterStateCollection.getRouter();
|
||||||
collectionStatus = getCollectionStatus((Map<String,Object>) stateMap.get(name), name, shard);
|
Collection<Slice> slices = router.getSearchSlices(routeKey, null, clusterStateCollection);
|
||||||
|
for (Slice slice : slices) {
|
||||||
|
requestedShards.add(slice.getName());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (shard != null) {
|
||||||
|
requestedShards.add(shard);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clusterStateCollection.getStateFormat() > 1) {
|
||||||
|
bytes = ZkStateReader.toJSON(clusterStateCollection);
|
||||||
|
Map<String, Object> docCollection = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
|
||||||
|
collectionStatus = getCollectionStatus(docCollection, name, requestedShards);
|
||||||
|
} else {
|
||||||
|
collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, requestedShards);
|
||||||
|
}
|
||||||
|
|
||||||
|
collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
|
||||||
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
|
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
|
||||||
collectionStatus.put("aliases", collectionVsAliases.get(name));
|
collectionStatus.put("aliases", collectionVsAliases.get(name));
|
||||||
}
|
}
|
||||||
|
@ -838,43 +857,6 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||||
collectionStatus.put("configName", configName);
|
collectionStatus.put("configName", configName);
|
||||||
collectionProps.add(name, collectionStatus);
|
collectionProps.add(name, collectionStatus);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
String routeKey = message.getStr(ShardParams._ROUTE_);
|
|
||||||
Map<String, Object> docCollection = null;
|
|
||||||
if (clusterState.getCollection(collection).getStateFormat() > 1) {
|
|
||||||
bytes = ZkStateReader.toJSON(clusterState.getCollection(collection));
|
|
||||||
docCollection = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
|
|
||||||
} else {
|
|
||||||
docCollection = (Map<String,Object>) stateMap.get(collection);
|
|
||||||
}
|
|
||||||
if (routeKey == null) {
|
|
||||||
Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, shard);
|
|
||||||
if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) {
|
|
||||||
collectionStatus.put("aliases", collectionVsAliases.get(collection));
|
|
||||||
}
|
|
||||||
String configName = zkStateReader.readConfigName(collection);
|
|
||||||
collectionStatus.put("configName", configName);
|
|
||||||
collectionProps.add(collection, collectionStatus);
|
|
||||||
} else {
|
|
||||||
DocCollection coll = clusterState.getCollection(collection);
|
|
||||||
DocRouter router = coll.getRouter();
|
|
||||||
Collection<Slice> slices = router.getSearchSlices(routeKey, null, coll);
|
|
||||||
String s = "";
|
|
||||||
for (Slice slice : slices) {
|
|
||||||
s += slice.getName() + ",";
|
|
||||||
}
|
|
||||||
if (shard != null) {
|
|
||||||
s += shard;
|
|
||||||
}
|
|
||||||
Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, s);
|
|
||||||
if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) {
|
|
||||||
collectionStatus.put("aliases", collectionVsAliases.get(collection));
|
|
||||||
}
|
|
||||||
String configName = zkStateReader.readConfigName(collection);
|
|
||||||
collectionStatus.put("configName", configName);
|
|
||||||
collectionProps.add(collection, collectionStatus);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
List<String> liveNodes = zkStateReader.getZkClient().getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null, true);
|
List<String> liveNodes = zkStateReader.getZkClient().getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null, true);
|
||||||
|
|
||||||
|
@ -946,21 +928,21 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||||
*
|
*
|
||||||
* @param collection collection map parsed from JSON-serialized {@link ClusterState}
|
* @param collection collection map parsed from JSON-serialized {@link ClusterState}
|
||||||
* @param name collection name
|
* @param name collection name
|
||||||
* @param shardStr comma separated shard names
|
* @param requestedShards a set of shards to be returned in the status.
|
||||||
|
* An empty or null values indicates <b>all</b> shards.
|
||||||
* @return map of collection properties
|
* @return map of collection properties
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private Map<String, Object> getCollectionStatus(Map<String, Object> collection, String name, String shardStr) {
|
private Map<String, Object> getCollectionStatus(Map<String, Object> collection, String name, Set<String> requestedShards) {
|
||||||
if (collection == null) {
|
if (collection == null) {
|
||||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " not found");
|
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " not found");
|
||||||
}
|
}
|
||||||
if (shardStr == null) {
|
if (requestedShards == null || requestedShards.isEmpty()) {
|
||||||
return collection;
|
return collection;
|
||||||
} else {
|
} else {
|
||||||
Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
|
Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
|
||||||
Map<String, Object> selected = new HashMap<>();
|
Map<String, Object> selected = new HashMap<>();
|
||||||
List<String> selectedShards = Arrays.asList(shardStr.split(","));
|
for (String selectedShard : requestedShards) {
|
||||||
for (String selectedShard : selectedShards) {
|
|
||||||
if (!shards.containsKey(selectedShard)) {
|
if (!shards.containsKey(selectedShard)) {
|
||||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
|
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
@ -80,6 +81,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
|
||||||
clusterStatusAliasTest();
|
clusterStatusAliasTest();
|
||||||
clusterStatusRolesTest();
|
clusterStatusRolesTest();
|
||||||
replicaPropTest();
|
replicaPropTest();
|
||||||
|
clusterStatusZNodeVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void clusterStatusWithCollectionAndShard() throws IOException, SolrServerException {
|
private void clusterStatusWithCollectionAndShard() throws IOException, SolrServerException {
|
||||||
|
@ -169,6 +171,55 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void clusterStatusZNodeVersion() throws Exception {
|
||||||
|
String cname = "clusterStatusZNodeVersion";
|
||||||
|
try (CloudSolrClient client = createCloudClient(null)) {
|
||||||
|
|
||||||
|
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create();
|
||||||
|
create.setCollectionName(cname);
|
||||||
|
create.setMaxShardsPerNode(1);
|
||||||
|
create.setNumShards(1);
|
||||||
|
create.setReplicationFactor(1);
|
||||||
|
create.setConfigName("conf1");
|
||||||
|
create.process(client);
|
||||||
|
|
||||||
|
waitForRecoveriesToFinish(cname, true);
|
||||||
|
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
params.set("action", CollectionParams.CollectionAction.CLUSTERSTATUS.toString());
|
||||||
|
params.set("collection", cname);
|
||||||
|
SolrRequest request = new QueryRequest(params);
|
||||||
|
request.setPath("/admin/collections");
|
||||||
|
|
||||||
|
NamedList<Object> rsp = client.request(request);
|
||||||
|
NamedList<Object> cluster = (NamedList<Object>) rsp.get("cluster");
|
||||||
|
assertNotNull("Cluster state should not be null", cluster);
|
||||||
|
NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
|
||||||
|
assertNotNull("Collections should not be null in cluster state", collections);
|
||||||
|
assertEquals(1, collections.size());
|
||||||
|
Map<String, Object> collection = (Map<String, Object>) collections.get(cname);
|
||||||
|
assertNotNull(collection);
|
||||||
|
assertEquals("conf1", collection.get("configName"));
|
||||||
|
Integer znodeVersion = (Integer) collection.get("znodeVersion");
|
||||||
|
assertNotNull(znodeVersion);
|
||||||
|
|
||||||
|
CollectionAdminRequest.AddReplica addReplica = new CollectionAdminRequest.AddReplica();
|
||||||
|
addReplica.setCollectionName(cname);
|
||||||
|
addReplica.setShardName("shard1");
|
||||||
|
addReplica.process(client);
|
||||||
|
|
||||||
|
waitForRecoveriesToFinish(cname, true);
|
||||||
|
|
||||||
|
rsp = client.request(request);
|
||||||
|
cluster = (NamedList<Object>) rsp.get("cluster");
|
||||||
|
collections = (NamedList<Object>) cluster.get("collections");
|
||||||
|
collection = (Map<String, Object>) collections.get(cname);
|
||||||
|
Integer newVersion = (Integer) collection.get("znodeVersion");
|
||||||
|
assertNotNull(newVersion);
|
||||||
|
assertTrue(newVersion > znodeVersion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void clusterStatusWithRouteKey() throws IOException, SolrServerException {
|
private void clusterStatusWithRouteKey() throws IOException, SolrServerException {
|
||||||
try (CloudSolrClient client = createCloudClient(DEFAULT_COLLECTION)) {
|
try (CloudSolrClient client = createCloudClient(DEFAULT_COLLECTION)) {
|
||||||
SolrInputDocument doc = new SolrInputDocument();
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
|
Loading…
Reference in New Issue